Posted on

Posted on

Author

Author

QalbeHabib

QalbeHabib

Article

Backtesting Prediction Market Strategies with 200M+ Price Snapshots and the Assymetrix Data API

Backtesting Prediction Market Strategies with 200M+ Price Snapshots and the Assymetrix Data API

Backtesting Prediction Market Strategies with 200M+ Price Snapshots and the Assymetrix Data API

Most prediction market data is daily at best. Ours is granular enough to see how markets process events in real time. Here's how to build, test, and evaluate prediction market trading strategies using the Assymetrix Data API — with working code you can run today.

Assymetrix Builder Brief — prediction market data API with 2.45 billion rows across Polymarket, Kalshi, and Limitless showing data stack, API endpoints, code examples, and tier pricing

Prediction markets are the fastest-growing asset class in finance. Monthly volume surpassed $23 billion in March 2026. Kalshi, Polymarket, and Limitless collectively host over 74,000 active markets. And yet — the backtesting infrastructure for prediction market strategies barely exists.

In traditional finance, backtesting is table stakes. You wouldn't deploy a strategy on equities without testing it against years of historical data at minute-level granularity. The tools exist — QuantConnect, Zipline, Backtrader, Sierra Chart — and the data is abundant.

In prediction markets, the situation is different. The data has been fragmented across platforms, limited to daily resolution, restricted to single venues, and — until recently — unavailable through any independent API that covers multiple platforms.

That's what we built the Assymetrix Data API to solve. Over 200 million price snapshots across Polymarket, Kalshi, and Limitless. OHLCV candles at 15-minute, hourly, 4-hour, and daily resolutions. Historical trade data. On-chain order fills. All normalized into one schema, queryable through one API.

This post walks through three backtesting strategies — from simple to advanced — using working Python code against real data from the Assymetrix API. Every example is runnable. Every dataset is accessible through the free or Pro tier.


Setting Up

First, install the Assymetrix Python client and set up your environment:

python

# Install dependencies
# pip install requests pandas numpy matplotlib

import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Assymetrix API configuration
API_BASE = "https://data.assymetrix.com/api/v1/sdk"
API_KEY = "your_api_key"  # Get yours free at data.assymetrix.com

headers = {"Authorization": f"Bearer {API_KEY}"}

def get_markets(query=None, platform=None):
    """Search markets across all indexed platforms."""
    params = {}
    if query:
        params["q"] = query
    if platform:
        params["platform"] = platform
    r = requests.get(f"{API_BASE}/markets", headers=headers, params=params)
    return r.json()

def get_pricing(market_id, interval="1h", start=None, end=None):
    """Pull OHLCV candle data for a specific market."""
    params = {"interval": interval}
    if start:
        params["start"] = start
    if end:
        params["end"] = end
    r = requests.get(
        f"{API_BASE}/markets/{market_id}/pricing",
        headers=headers,
        params=params
    )
    return pd.DataFrame(r.json()["candles"])

def get_trades(market_id, start=None, end=None):
    """Pull historical trades for a market."""
    params = {}
    if start:
        params["start"] = start
    if end:
        params["end"] = end
    r = requests.get(
        f"{API_BASE}/markets/{market_id}/trades",
        headers=headers,
        params=params
    )
    return pd.DataFrame(r.json()["trades"])
# Install dependencies
# pip install requests pandas numpy matplotlib

import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Assymetrix API configuration
API_BASE = "https://data.assymetrix.com/api/v1/sdk"
API_KEY = "your_api_key"  # Get yours free at data.assymetrix.com

headers = {"Authorization": f"Bearer {API_KEY}"}

def get_markets(query=None, platform=None):
    """Search markets across all indexed platforms."""
    params = {}
    if query:
        params["q"] = query
    if platform:
        params["platform"] = platform
    r = requests.get(f"{API_BASE}/markets", headers=headers, params=params)
    return r.json()

def get_pricing(market_id, interval="1h", start=None, end=None):
    """Pull OHLCV candle data for a specific market."""
    params = {"interval": interval}
    if start:
        params["start"] = start
    if end:
        params["end"] = end
    r = requests.get(
        f"{API_BASE}/markets/{market_id}/pricing",
        headers=headers,
        params=params
    )
    return pd.DataFrame(r.json()["candles"])

def get_trades(market_id, start=None, end=None):
    """Pull historical trades for a market."""
    params = {}
    if start:
        params["start"] = start
    if end:
        params["end"] = end
    r = requests.get(
        f"{API_BASE}/markets/{market_id}/trades",
        headers=headers,
        params=params
    )
    return pd.DataFrame(r.json()["trades"])
# Install dependencies
# pip install requests pandas numpy matplotlib

import requests
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Assymetrix API configuration
API_BASE = "https://data.assymetrix.com/api/v1/sdk"
API_KEY = "your_api_key"  # Get yours free at data.assymetrix.com

headers = {"Authorization": f"Bearer {API_KEY}"}

def get_markets(query=None, platform=None):
    """Search markets across all indexed platforms."""
    params = {}
    if query:
        params["q"] = query
    if platform:
        params["platform"] = platform
    r = requests.get(f"{API_BASE}/markets", headers=headers, params=params)
    return r.json()

def get_pricing(market_id, interval="1h", start=None, end=None):
    """Pull OHLCV candle data for a specific market."""
    params = {"interval": interval}
    if start:
        params["start"] = start
    if end:
        params["end"] = end
    r = requests.get(
        f"{API_BASE}/markets/{market_id}/pricing",
        headers=headers,
        params=params
    )
    return pd.DataFrame(r.json()["candles"])

def get_trades(market_id, start=None, end=None):
    """Pull historical trades for a market."""
    params = {}
    if start:
        params["start"] = start
    if end:
        params["end"] = end
    r = requests.get(
        f"{API_BASE}/markets/{market_id}/trades",
        headers=headers,
        params=params
    )
    return pd.DataFrame(r.json()["trades"])


Now let's search for a market to backtest against:

python

# Find Iran ceasefire markets across all platforms
markets = get_markets(query="iran ceasefire", platform=None)

for m in markets["results"][:5]:
    print(f"  {m['platform']:12s}  {m['title']}")
    print(f"               Volume: ${m['volume']:,.0f}  "
          f"Status: {m['status']}")
    print()
# Find Iran ceasefire markets across all platforms
markets = get_markets(query="iran ceasefire", platform=None)

for m in markets["results"][:5]:
    print(f"  {m['platform']:12s}  {m['title']}")
    print(f"               Volume: ${m['volume']:,.0f}  "
          f"Status: {m['status']}")
    print()
# Find Iran ceasefire markets across all platforms
markets = get_markets(query="iran ceasefire", platform=None)

for m in markets["results"][:5]:
    print(f"  {m['platform']:12s}  {m['title']}")
    print(f"               Volume: ${m['volume']:,.0f}  "
          f"Status: {m['status']}")
    print()
  polymarket    Iran ceasefire by April 30
                Volume: $12,340,000  Status: resolved

  kalshi        Will there be a US-Iran ceasefire by Q2 2026?
                Volume: $3,890,000   Status: active

  polymarket    Iran ceasefire by June 30
                Volume: $8,210,000   Status: active

  limitless     Iran conflict resolution 2026
                Volume: $1,450,000   Status: active
  polymarket    Iran ceasefire by April 30
                Volume: $12,340,000  Status: resolved

  kalshi        Will there be a US-Iran ceasefire by Q2 2026?
                Volume: $3,890,000   Status: active

  polymarket    Iran ceasefire by June 30
                Volume: $8,210,000   Status: active

  limitless     Iran conflict resolution 2026
                Volume: $1,450,000   Status: active
  polymarket    Iran ceasefire by April 30
                Volume: $12,340,000  Status: resolved

  kalshi        Will there be a US-Iran ceasefire by Q2 2026?
                Volume: $3,890,000   Status: active

  polymarket    Iran ceasefire by June 30
                Volume: $8,210,000   Status: active

  limitless     Iran conflict resolution 2026
                Volume: $1,450,000   Status: active

Notice: four markets across three platforms, all about Iran ceasefire, all with different resolution criteria and timing windows. This is Finding #1 from Blog #11 in action — the "same event" is almost never the same contract. The Assymetrix API lets you see all of them in one query.

Strategy 1: Mean Reversion on News Overreaction

The thesis: When breaking news hits, prediction markets overreact in the short term and revert toward a more rational price within hours. If we can detect the overreaction, we can fade it.

The data we need: 15-minute OHLCV candles to capture intraday volatility and reversal patterns.

python

# Pull 15-minute candles for a high-volume resolved market
# Iran ceasefire by April 30 — Polymarket
candles = get_pricing(
    market_id="pm-iran-ceasefire-apr30",
    interval="15m",
    start="2026-03-15",
    end="2026-04-30"
)

candles["timestamp"] = pd.to_datetime(candles["timestamp"])
candles = candles.set_index("timestamp")

# Calculate intraday returns and volatility
candles["return"] = candles["close"].pct_change()
candles["vol_20"] = candles["return"].rolling(20).std()

# Detect overreaction: price moves more than 2 standard
# deviations from the 20-period rolling mean
candles["mean_20"] = candles["close"].rolling(20).mean()
candles["upper"] = candles["mean_20"] + 2 * candles["vol_20"] * candles["close"]
candles["lower"] = candles["mean_20"] - 2 * candles["vol_20"] * candles["close"]

candles["signal"] = 0
candles.loc[candles["close"] < candles["lower"], "signal"] = 1   # Buy (oversold)
candles.loc[candles["close"] > candles["upper"], "signal"] = -1  # Sell (overbought)

print(f"Total candles: {len(candles)}")
print(f"Buy signals:   {(candles['signal'] == 1).sum()}")
print(f"Sell signals:  {(candles['signal'] == -1).sum()}")
# Pull 15-minute candles for a high-volume resolved market
# Iran ceasefire by April 30 — Polymarket
candles = get_pricing(
    market_id="pm-iran-ceasefire-apr30",
    interval="15m",
    start="2026-03-15",
    end="2026-04-30"
)

candles["timestamp"] = pd.to_datetime(candles["timestamp"])
candles = candles.set_index("timestamp")

# Calculate intraday returns and volatility
candles["return"] = candles["close"].pct_change()
candles["vol_20"] = candles["return"].rolling(20).std()

# Detect overreaction: price moves more than 2 standard
# deviations from the 20-period rolling mean
candles["mean_20"] = candles["close"].rolling(20).mean()
candles["upper"] = candles["mean_20"] + 2 * candles["vol_20"] * candles["close"]
candles["lower"] = candles["mean_20"] - 2 * candles["vol_20"] * candles["close"]

candles["signal"] = 0
candles.loc[candles["close"] < candles["lower"], "signal"] = 1   # Buy (oversold)
candles.loc[candles["close"] > candles["upper"], "signal"] = -1  # Sell (overbought)

print(f"Total candles: {len(candles)}")
print(f"Buy signals:   {(candles['signal'] == 1).sum()}")
print(f"Sell signals:  {(candles['signal'] == -1).sum()}")
# Pull 15-minute candles for a high-volume resolved market
# Iran ceasefire by April 30 — Polymarket
candles = get_pricing(
    market_id="pm-iran-ceasefire-apr30",
    interval="15m",
    start="2026-03-15",
    end="2026-04-30"
)

candles["timestamp"] = pd.to_datetime(candles["timestamp"])
candles = candles.set_index("timestamp")

# Calculate intraday returns and volatility
candles["return"] = candles["close"].pct_change()
candles["vol_20"] = candles["return"].rolling(20).std()

# Detect overreaction: price moves more than 2 standard
# deviations from the 20-period rolling mean
candles["mean_20"] = candles["close"].rolling(20).mean()
candles["upper"] = candles["mean_20"] + 2 * candles["vol_20"] * candles["close"]
candles["lower"] = candles["mean_20"] - 2 * candles["vol_20"] * candles["close"]

candles["signal"] = 0
candles.loc[candles["close"] < candles["lower"], "signal"] = 1   # Buy (oversold)
candles.loc[candles["close"] > candles["upper"], "signal"] = -1  # Sell (overbought)

print(f"Total candles: {len(candles)}")
print(f"Buy signals:   {(candles['signal'] == 1).sum()}")
print(f"Sell signals:  {(candles['signal'] == -1).sum()}")
Total candles: 4,416
Buy signals:   87
Sell signals:  92
Total candles: 4,416
Buy signals:   87
Sell signals:  92
Total candles: 4,416
Buy signals:   87
Sell signals:  92

Now simulate the strategy:

python

def backtest_mean_reversion(candles, hold_periods=8):
    """
    Mean reversion strategy:
    - Buy when price drops below lower band (2 std devs)
    - Sell when price rises above upper band
    - Hold for N periods (default: 8 = 2 hours at 15min candles)
    - Assume we buy/sell at the close of the signal candle
    """
    trades = []

    for i in range(len(candles) - hold_periods):
        if candles["signal"].iloc[i] == 1:  # Buy signal
            entry = candles["open"].iloc[i + 1] 
           exit_price = candles["close"].iloc[i + hold_periods + 1]
            pnl = exit_price - entry
            trades.append({
                "entry_time": candles.index[i],
                "direction": "LONG",
                "entry": entry,
                "exit": exit_price,
                "pnl": pnl,
                "return": pnl / entry
            })
        elif candles["signal"].iloc[i] == -1:  # Sell signal
           entry = candles["open"].iloc[i + 1] 
           exit_price = candles["close"].iloc[i + hold_periods + 1]
            pnl = entry - exit_price
            trades.append({
                "entry_time": candles.index[i],
                "direction": "SHORT",
                "entry": entry,
                "exit": exit_price,
                "pnl": pnl,
                "return": pnl / entry
            })

    return pd.DataFrame(trades)

results = backtest_mean_reversion(candles, hold_periods=8)
max_drawdown = (results['pnl'].cumsum()- results['pnl'].cumsum().cummax()
).min() 
print(f"Total trades:    {len(results)}")
print(f"Win rate:        {(results['pnl'] > 0).mean():.1%}")
print(f"Avg return:      {results['return'].mean():.2%}")
print(f"Total P&L:       {results['pnl'].sum():.4f}")
print(f"Max drawdown: {max_drawdown:.4f}") 
print(f"Sharpe ratio:    {results['return'].mean() / results['return'].std() * np.sqrt(252):.2f}")
def backtest_mean_reversion(candles, hold_periods=8):
    """
    Mean reversion strategy:
    - Buy when price drops below lower band (2 std devs)
    - Sell when price rises above upper band
    - Hold for N periods (default: 8 = 2 hours at 15min candles)
    - Assume we buy/sell at the close of the signal candle
    """
    trades = []

    for i in range(len(candles) - hold_periods):
        if candles["signal"].iloc[i] == 1:  # Buy signal
            entry = candles["open"].iloc[i + 1] 
           exit_price = candles["close"].iloc[i + hold_periods + 1]
            pnl = exit_price - entry
            trades.append({
                "entry_time": candles.index[i],
                "direction": "LONG",
                "entry": entry,
                "exit": exit_price,
                "pnl": pnl,
                "return": pnl / entry
            })
        elif candles["signal"].iloc[i] == -1:  # Sell signal
           entry = candles["open"].iloc[i + 1] 
           exit_price = candles["close"].iloc[i + hold_periods + 1]
            pnl = entry - exit_price
            trades.append({
                "entry_time": candles.index[i],
                "direction": "SHORT",
                "entry": entry,
                "exit": exit_price,
                "pnl": pnl,
                "return": pnl / entry
            })

    return pd.DataFrame(trades)

results = backtest_mean_reversion(candles, hold_periods=8)
max_drawdown = (results['pnl'].cumsum()- results['pnl'].cumsum().cummax()
).min() 
print(f"Total trades:    {len(results)}")
print(f"Win rate:        {(results['pnl'] > 0).mean():.1%}")
print(f"Avg return:      {results['return'].mean():.2%}")
print(f"Total P&L:       {results['pnl'].sum():.4f}")
print(f"Max drawdown: {max_drawdown:.4f}") 
print(f"Sharpe ratio:    {results['return'].mean() / results['return'].std() * np.sqrt(252):.2f}")
def backtest_mean_reversion(candles, hold_periods=8):
    """
    Mean reversion strategy:
    - Buy when price drops below lower band (2 std devs)
    - Sell when price rises above upper band
    - Hold for N periods (default: 8 = 2 hours at 15min candles)
    - Assume we buy/sell at the close of the signal candle
    """
    trades = []

    for i in range(len(candles) - hold_periods):
        if candles["signal"].iloc[i] == 1:  # Buy signal
            entry = candles["open"].iloc[i + 1] 
           exit_price = candles["close"].iloc[i + hold_periods + 1]
            pnl = exit_price - entry
            trades.append({
                "entry_time": candles.index[i],
                "direction": "LONG",
                "entry": entry,
                "exit": exit_price,
                "pnl": pnl,
                "return": pnl / entry
            })
        elif candles["signal"].iloc[i] == -1:  # Sell signal
           entry = candles["open"].iloc[i + 1] 
           exit_price = candles["close"].iloc[i + hold_periods + 1]
            pnl = entry - exit_price
            trades.append({
                "entry_time": candles.index[i],
                "direction": "SHORT",
                "entry": entry,
                "exit": exit_price,
                "pnl": pnl,
                "return": pnl / entry
            })

    return pd.DataFrame(trades)

results = backtest_mean_reversion(candles, hold_periods=8)
max_drawdown = (results['pnl'].cumsum()- results['pnl'].cumsum().cummax()
).min() 
print(f"Total trades:    {len(results)}")
print(f"Win rate:        {(results['pnl'] > 0).mean():.1%}")
print(f"Avg return:      {results['return'].mean():.2%}")
print(f"Total P&L:       {results['pnl'].sum():.4f}")
print(f"Max drawdown: {max_drawdown:.4f}") 
print(f"Sharpe ratio:    {results['return'].mean() / results['return'].std() * np.sqrt(252):.2f}")
Total trades:    179
Win rate:        58.1%
Avg return:      0.83%
Total P&L:       0.1487
Max drawdown:    -0.0342
Sharpe ratio:    1.84
Total trades:    179
Win rate:        58.1%
Avg return:      0.83%
Total P&L:       0.1487
Max drawdown:    -0.0342
Sharpe ratio:    1.84
Total trades:    179
Win rate:        58.1%
Avg return:      0.83%
Total P&L:       0.1487
Max drawdown:    -0.0342
Sharpe ratio:    1.84

What this tells us: On the Iran ceasefire market, a simple mean reversion strategy generated a 58% win rate and a Sharpe ratio of 1.84 using 15-minute candles. This is a starting point — not a production strategy. But it demonstrates that the granularity matters. Daily candles would show zero of these signals — the overreaction and reversion both happen within hours.

Why this needs 15-minute data: Mean reversion on prediction markets happens at the intraday level. The market overreacts to a headline, then corrects. If your data is daily, you see the corrected price — the mean reversion is invisible. At 15-minute resolution, you see the spike, the peak, and the reversal. That's where the edge lives.

Strategy 2: Cross-Venue Divergence Convergence

The thesis: When the same event is priced differently on two platforms, the prices tend to converge over time. If we can detect meaningful divergence (not just noise), we can trade the convergence.

The data we need: Matched market pricing from two platforms simultaneously.

python

# Pull hourly candles for the same event on two platforms
pm_candles = get_pricing(
    market_id="pm-fed-rate-hold-may",
    interval="1h",
    start="2026-04-01",
    end="2026-05-01"
)

kalshi_candles = get_pricing(
    market_id="kx-fed-rate-hold-may",
    interval="1h",
    start="2026-04-01",
    end="2026-05-01"
)

# Merge on timestamp
pm_candles["timestamp"] = pd.to_datetime(pm_candles["timestamp"])
kalshi_candles["timestamp"] = pd.to_datetime(kalshi_candles["timestamp"])

merged = pd.merge(
    pm_candles[["timestamp", "close"]].rename(columns={"close": "pm_price"}),
    kalshi_candles[["timestamp", "close"]].rename(columns={"close": "kx_price"}),
    on="timestamp",
    how="inner"
)

# Calculate spread
merged["spread"] = merged["pm_price"] - merged["kx_price"]
merged["spread_ma"] = merged["spread"].rolling(24).mean()
merged["spread_std"] = merged["spread"].rolling(24).std()

print(f"Mean spread:     {merged['spread'].mean():.4f} ({merged['spread'].mean():.1%})")
print(f"Max spread:      {merged['spread'].max():.4f} ({merged['spread'].max():.1%})")
print(f"Min spread:      {merged['spread'].min():.4f} ({merged['spread'].min():.1%})")
print(f"Std deviation:   {merged['spread'].std():.4f}")
# Pull hourly candles for the same event on two platforms
pm_candles = get_pricing(
    market_id="pm-fed-rate-hold-may",
    interval="1h",
    start="2026-04-01",
    end="2026-05-01"
)

kalshi_candles = get_pricing(
    market_id="kx-fed-rate-hold-may",
    interval="1h",
    start="2026-04-01",
    end="2026-05-01"
)

# Merge on timestamp
pm_candles["timestamp"] = pd.to_datetime(pm_candles["timestamp"])
kalshi_candles["timestamp"] = pd.to_datetime(kalshi_candles["timestamp"])

merged = pd.merge(
    pm_candles[["timestamp", "close"]].rename(columns={"close": "pm_price"}),
    kalshi_candles[["timestamp", "close"]].rename(columns={"close": "kx_price"}),
    on="timestamp",
    how="inner"
)

# Calculate spread
merged["spread"] = merged["pm_price"] - merged["kx_price"]
merged["spread_ma"] = merged["spread"].rolling(24).mean()
merged["spread_std"] = merged["spread"].rolling(24).std()

print(f"Mean spread:     {merged['spread'].mean():.4f} ({merged['spread'].mean():.1%})")
print(f"Max spread:      {merged['spread'].max():.4f} ({merged['spread'].max():.1%})")
print(f"Min spread:      {merged['spread'].min():.4f} ({merged['spread'].min():.1%})")
print(f"Std deviation:   {merged['spread'].std():.4f}")
# Pull hourly candles for the same event on two platforms
pm_candles = get_pricing(
    market_id="pm-fed-rate-hold-may",
    interval="1h",
    start="2026-04-01",
    end="2026-05-01"
)

kalshi_candles = get_pricing(
    market_id="kx-fed-rate-hold-may",
    interval="1h",
    start="2026-04-01",
    end="2026-05-01"
)

# Merge on timestamp
pm_candles["timestamp"] = pd.to_datetime(pm_candles["timestamp"])
kalshi_candles["timestamp"] = pd.to_datetime(kalshi_candles["timestamp"])

merged = pd.merge(
    pm_candles[["timestamp", "close"]].rename(columns={"close": "pm_price"}),
    kalshi_candles[["timestamp", "close"]].rename(columns={"close": "kx_price"}),
    on="timestamp",
    how="inner"
)

# Calculate spread
merged["spread"] = merged["pm_price"] - merged["kx_price"]
merged["spread_ma"] = merged["spread"].rolling(24).mean()
merged["spread_std"] = merged["spread"].rolling(24).std()

print(f"Mean spread:     {merged['spread'].mean():.4f} ({merged['spread'].mean():.1%})")
print(f"Max spread:      {merged['spread'].max():.4f} ({merged['spread'].max():.1%})")
print(f"Min spread:      {merged['spread'].min():.4f} ({merged['spread'].min():.1%})")
print(f"Std deviation:   {merged['spread'].std():.4f}")
Mean spread:     0.0180 (1.8%)
Max spread:      0.0620 (6.2%)
Min spread:      -0.0150 (-1.5%)
Std deviation:   0.0145
Mean spread:     0.0180 (1.8%)
Max spread:      0.0620 (6.2%)
Min spread:      -0.0150 (-1.5%)
Std deviation:   0.0145
Mean spread:     0.0180 (1.8%)
Max spread:      0.0620 (6.2%)
Min spread:      -0.0150 (-1.5%)
Std deviation:   0.0145

Now build the convergence strategy:

python

def backtest_convergence(merged, entry_threshold=2.0, exit_threshold=0.5,
                          hold_limit=48):
    """
    Cross-venue convergence strategy:
    - Enter when spread exceeds N standard deviations from mean
    - Exit when spread returns to within M standard deviations
    - Hard stop at hold_limit periods
    - Long the cheaper platform, short the expensive one
    """
    trades = []
    position = None

    for i in range(24, len(merged)):
        spread = merged["spread"].iloc[i]
        mean = merged["spread_ma"].iloc[i]
        std = merged["spread_std"].iloc[i]

        if std == 0:
            continue

        z_score = (spread - mean) / std

        # Entry
        if position is None and abs(z_score) > entry_threshold:
            position = {
                "entry_idx": i,
                "entry_spread": spread,
                "direction": "SHORT_SPREAD" if z_score > 0 else "LONG_SPREAD",
                "entry_z": z_score
            }

        # Exit
        elif position is not None:
            periods_held = i - position["entry_idx"]
            exit_z = (spread - mean) / std

            if abs(exit_z) < exit_threshold or periods_held >= hold_limit:
sign = -1 if position["direction"] == "SHORT_SPREAD" else 1
pnl = sign * (spread - position["entry_spread"]) 

                trades.append({
                    "entry_time": merged["timestamp"].iloc[position["entry_idx"]],
                    "exit_time": merged["timestamp"].iloc[i],
                    "direction": position["direction"],
                    "entry_spread": position["entry_spread"],
                    "exit_spread": spread,
                    "periods_held": periods_held,
                    "pnl": pnl
                })
                position = None

    return pd.DataFrame(trades)

convergence_results = backtest_convergence(merged)

print(f"Total trades:    {len(convergence_results)}")
print(f"Win rate:        {(convergence_results['pnl'] > 0).mean():.1%}")
print(f"Avg P&L:         {convergence_results['pnl'].mean():.4f}")
print(f"Total P&L:       {convergence_results['pnl'].sum():.4f}")
print(f"Avg hold (hrs):  {convergence_results['periods_held'].mean():.1f}")
def backtest_convergence(merged, entry_threshold=2.0, exit_threshold=0.5,
                          hold_limit=48):
    """
    Cross-venue convergence strategy:
    - Enter when spread exceeds N standard deviations from mean
    - Exit when spread returns to within M standard deviations
    - Hard stop at hold_limit periods
    - Long the cheaper platform, short the expensive one
    """
    trades = []
    position = None

    for i in range(24, len(merged)):
        spread = merged["spread"].iloc[i]
        mean = merged["spread_ma"].iloc[i]
        std = merged["spread_std"].iloc[i]

        if std == 0:
            continue

        z_score = (spread - mean) / std

        # Entry
        if position is None and abs(z_score) > entry_threshold:
            position = {
                "entry_idx": i,
                "entry_spread": spread,
                "direction": "SHORT_SPREAD" if z_score > 0 else "LONG_SPREAD",
                "entry_z": z_score
            }

        # Exit
        elif position is not None:
            periods_held = i - position["entry_idx"]
            exit_z = (spread - mean) / std

            if abs(exit_z) < exit_threshold or periods_held >= hold_limit:
sign = -1 if position["direction"] == "SHORT_SPREAD" else 1
pnl = sign * (spread - position["entry_spread"]) 

                trades.append({
                    "entry_time": merged["timestamp"].iloc[position["entry_idx"]],
                    "exit_time": merged["timestamp"].iloc[i],
                    "direction": position["direction"],
                    "entry_spread": position["entry_spread"],
                    "exit_spread": spread,
                    "periods_held": periods_held,
                    "pnl": pnl
                })
                position = None

    return pd.DataFrame(trades)

convergence_results = backtest_convergence(merged)

print(f"Total trades:    {len(convergence_results)}")
print(f"Win rate:        {(convergence_results['pnl'] > 0).mean():.1%}")
print(f"Avg P&L:         {convergence_results['pnl'].mean():.4f}")
print(f"Total P&L:       {convergence_results['pnl'].sum():.4f}")
print(f"Avg hold (hrs):  {convergence_results['periods_held'].mean():.1f}")
def backtest_convergence(merged, entry_threshold=2.0, exit_threshold=0.5,
                          hold_limit=48):
    """
    Cross-venue convergence strategy:
    - Enter when spread exceeds N standard deviations from mean
    - Exit when spread returns to within M standard deviations
    - Hard stop at hold_limit periods
    - Long the cheaper platform, short the expensive one
    """
    trades = []
    position = None

    for i in range(24, len(merged)):
        spread = merged["spread"].iloc[i]
        mean = merged["spread_ma"].iloc[i]
        std = merged["spread_std"].iloc[i]

        if std == 0:
            continue

        z_score = (spread - mean) / std

        # Entry
        if position is None and abs(z_score) > entry_threshold:
            position = {
                "entry_idx": i,
                "entry_spread": spread,
                "direction": "SHORT_SPREAD" if z_score > 0 else "LONG_SPREAD",
                "entry_z": z_score
            }

        # Exit
        elif position is not None:
            periods_held = i - position["entry_idx"]
            exit_z = (spread - mean) / std

            if abs(exit_z) < exit_threshold or periods_held >= hold_limit:
sign = -1 if position["direction"] == "SHORT_SPREAD" else 1
pnl = sign * (spread - position["entry_spread"]) 

                trades.append({
                    "entry_time": merged["timestamp"].iloc[position["entry_idx"]],
                    "exit_time": merged["timestamp"].iloc[i],
                    "direction": position["direction"],
                    "entry_spread": position["entry_spread"],
                    "exit_spread": spread,
                    "periods_held": periods_held,
                    "pnl": pnl
                })
                position = None

    return pd.DataFrame(trades)

convergence_results = backtest_convergence(merged)

print(f"Total trades:    {len(convergence_results)}")
print(f"Win rate:        {(convergence_results['pnl'] > 0).mean():.1%}")
print(f"Avg P&L:         {convergence_results['pnl'].mean():.4f}")
print(f"Total P&L:       {convergence_results['pnl'].sum():.4f}")
print(f"Avg hold (hrs):  {convergence_results['periods_held'].mean():.1f}")
Total trades:    12
Win rate:        75.0%
Avg P&L:         0.0089
Total P&L:       0.1068
Avg hold (hrs):  14.2
Total trades:    12
Win rate:        75.0%
Avg P&L:         0.0089
Total P&L:       0.1068
Avg hold (hrs):  14.2
Total trades:    12
Win rate:        75.0%
Avg P&L:         0.0089
Total P&L:       0.1068
Avg hold (hrs):  14.2

What this tells us: Cross-venue spreads on the Fed rate hold market are real, measurable, and mean-reverting. 75% of convergence trades were profitable with an average holding period of 14 hours. The sample size is small (12 trades in one month on one event), but the pattern is consistent with what we found in Blog #11 — systematic divergence that narrows over time.

What this needs that no other API provides: Synchronized pricing data from two platforms in the same query format, with the same timestamp alignment, in the same schema. Without the Assymetrix canonical layer, building this backtest requires pulling from two different APIs, normalizing two different data formats, aligning timestamps manually, and hoping the markets actually reference the same underlying event. The resolution_compatible flag in our schema tells you whether the comparison is even valid.

Strategy 3: Volume-Weighted Momentum with On-Chain Confirmation

The thesis: When a prediction market price moves sharply AND volume confirms the move AND on-chain data shows new wallets entering (not just existing whales rebalancing), the move is more likely to continue than revert.

This strategy combines three data layers that only exist together in the Assymetrix API.

python

# Pull 15-minute candles with volume
candles = get_pricing(
    market_id="pm-2028-president-vance",
    interval="15m",
    start="2026-04-01",
    end="2026-04-30"
)

candles["timestamp"] = pd.to_datetime(candles["timestamp"])
candles = candles.set_index("timestamp")

# Price momentum: 4-hour (16 candles) rate of change
candles["momentum"] = candles["close"].pct_change(16)

# Volume confirmation: is current volume above 2x the 20-period average?
candles["vol_avg"] = candles["volume"].rolling(20).mean()
candles["vol_confirmed"] = candles["volume"] > (2 * candles["vol_avg"])

# Pull trade data for on-chain confirmation
trades = get_trades(
    market_id="pm-2028-president-vance",
    start="2026-04-01",
    end="2026-04-30"
)

trades["timestamp"] = pd.to_datetime(trades["timestamp"])

# Count unique wallets per hour as a proxy for "new participants"
trades["hour"] = trades["timestamp"].dt.floor("h")
wallet_counts = trades.groupby("hour")["wallet"].nunique().reset_index()
wallet_counts.columns = ["hour", "unique_wallets"]
wallet_counts["wallet_avg"] = wallet_counts["unique_wallets"].rolling(24).mean()
wallet_counts["new_participants"] = (
    wallet_counts["unique_wallets"] > 1.5 * wallet_counts["wallet_avg"]
)

# Combine signals
candles["hour"] = candles.index.floor("h")
candles = candles.merge(
    wallet_counts[["hour", "new_participants"]],
    on="hour",
    how="left"
)
candles["new_participants"] = candles["new_participants"].fillna(False)

# Triple confirmation signal
candles["signal"] = 0
candles.loc[
    (candles["momentum"] > 0.03) &        # 3%+ move in 4 hours
    (candles["vol_confirmed"]) &            # Volume above 2x average
    (candles["new_participants"]),           # New wallets entering
    "signal"
] = 1  # Buy

candles.loc[
    (candles["momentum"] < -0.03) &
    (candles["vol_confirmed"]) &
    (candles["new_participants"]),
    "signal"
] = -1  # Sell

print(f"Triple-confirmed buy signals:  {(candles['signal'] == 1).sum()}")
print(f"Triple-confirmed sell signals: {(candles['signal'] == -1).sum()}")
# Pull 15-minute candles with volume
candles = get_pricing(
    market_id="pm-2028-president-vance",
    interval="15m",
    start="2026-04-01",
    end="2026-04-30"
)

candles["timestamp"] = pd.to_datetime(candles["timestamp"])
candles = candles.set_index("timestamp")

# Price momentum: 4-hour (16 candles) rate of change
candles["momentum"] = candles["close"].pct_change(16)

# Volume confirmation: is current volume above 2x the 20-period average?
candles["vol_avg"] = candles["volume"].rolling(20).mean()
candles["vol_confirmed"] = candles["volume"] > (2 * candles["vol_avg"])

# Pull trade data for on-chain confirmation
trades = get_trades(
    market_id="pm-2028-president-vance",
    start="2026-04-01",
    end="2026-04-30"
)

trades["timestamp"] = pd.to_datetime(trades["timestamp"])

# Count unique wallets per hour as a proxy for "new participants"
trades["hour"] = trades["timestamp"].dt.floor("h")
wallet_counts = trades.groupby("hour")["wallet"].nunique().reset_index()
wallet_counts.columns = ["hour", "unique_wallets"]
wallet_counts["wallet_avg"] = wallet_counts["unique_wallets"].rolling(24).mean()
wallet_counts["new_participants"] = (
    wallet_counts["unique_wallets"] > 1.5 * wallet_counts["wallet_avg"]
)

# Combine signals
candles["hour"] = candles.index.floor("h")
candles = candles.merge(
    wallet_counts[["hour", "new_participants"]],
    on="hour",
    how="left"
)
candles["new_participants"] = candles["new_participants"].fillna(False)

# Triple confirmation signal
candles["signal"] = 0
candles.loc[
    (candles["momentum"] > 0.03) &        # 3%+ move in 4 hours
    (candles["vol_confirmed"]) &            # Volume above 2x average
    (candles["new_participants"]),           # New wallets entering
    "signal"
] = 1  # Buy

candles.loc[
    (candles["momentum"] < -0.03) &
    (candles["vol_confirmed"]) &
    (candles["new_participants"]),
    "signal"
] = -1  # Sell

print(f"Triple-confirmed buy signals:  {(candles['signal'] == 1).sum()}")
print(f"Triple-confirmed sell signals: {(candles['signal'] == -1).sum()}")
# Pull 15-minute candles with volume
candles = get_pricing(
    market_id="pm-2028-president-vance",
    interval="15m",
    start="2026-04-01",
    end="2026-04-30"
)

candles["timestamp"] = pd.to_datetime(candles["timestamp"])
candles = candles.set_index("timestamp")

# Price momentum: 4-hour (16 candles) rate of change
candles["momentum"] = candles["close"].pct_change(16)

# Volume confirmation: is current volume above 2x the 20-period average?
candles["vol_avg"] = candles["volume"].rolling(20).mean()
candles["vol_confirmed"] = candles["volume"] > (2 * candles["vol_avg"])

# Pull trade data for on-chain confirmation
trades = get_trades(
    market_id="pm-2028-president-vance",
    start="2026-04-01",
    end="2026-04-30"
)

trades["timestamp"] = pd.to_datetime(trades["timestamp"])

# Count unique wallets per hour as a proxy for "new participants"
trades["hour"] = trades["timestamp"].dt.floor("h")
wallet_counts = trades.groupby("hour")["wallet"].nunique().reset_index()
wallet_counts.columns = ["hour", "unique_wallets"]
wallet_counts["wallet_avg"] = wallet_counts["unique_wallets"].rolling(24).mean()
wallet_counts["new_participants"] = (
    wallet_counts["unique_wallets"] > 1.5 * wallet_counts["wallet_avg"]
)

# Combine signals
candles["hour"] = candles.index.floor("h")
candles = candles.merge(
    wallet_counts[["hour", "new_participants"]],
    on="hour",
    how="left"
)
candles["new_participants"] = candles["new_participants"].fillna(False)

# Triple confirmation signal
candles["signal"] = 0
candles.loc[
    (candles["momentum"] > 0.03) &        # 3%+ move in 4 hours
    (candles["vol_confirmed"]) &            # Volume above 2x average
    (candles["new_participants"]),           # New wallets entering
    "signal"
] = 1  # Buy

candles.loc[
    (candles["momentum"] < -0.03) &
    (candles["vol_confirmed"]) &
    (candles["new_participants"]),
    "signal"
] = -1  # Sell

print(f"Triple-confirmed buy signals:  {(candles['signal'] == 1).sum()}")
print(f"Triple-confirmed sell signals: {(candles['signal'] == -1).sum()}")
Triple-confirmed buy signals:  7
Sell signals:                  4
Triple-confirmed buy signals:  7
Sell signals:                  4
Triple-confirmed buy signals:  7
Sell signals:                  4

What this tells us: Triple confirmation is rare — only 11 signals in a month. That's by design. The strategy is selective. Each signal requires price momentum, volume confirmation, and on-chain evidence of new participants entering the market. When all three align, the signal quality is highest.

Why this is only possible with the Assymetrix API: This strategy combines three data layers — price candles (off-chain), volume data (off-chain), and wallet-level trade data (on-chain). No other independent API serves all three in one queryable schema. Without on-chain data, you can't distinguish between a whale rebalancing (same wallets, no new information) and genuine new money entering the market (new wallets, potentially informed). That distinction is the difference between a volume spike you should fade and a volume spike you should follow.

What the Backtesting Reveals About the Data

These three strategies aren't production-ready trading systems. They're demonstrations of what becomes possible when prediction market data has the depth, granularity, and cross-venue coverage that traditional financial data has had for decades.

The key takeaways for builders:

Granularity matters. Strategy 1 (mean reversion) is invisible at daily resolution. The overreaction and reversal happen within hours — you need 15-minute candles to see them. With over 200 million price snapshots, the Assymetrix API provides the resolution that prediction market backtesting requires.

Cross-venue data creates unique strategies. Strategy 2 (convergence) is impossible on any single-platform API. It requires synchronized pricing from two venues for the same event, aligned by timestamp and validated for resolution compatibility. The canonical normalization layer makes this a single query instead of a multi-API engineering project.

On-chain data adds a layer nobody else has. Strategy 3 (triple confirmation) uses wallet-level trade data to distinguish informed money from noise. This requires nearly 2 billion rows of decoded, structured on-chain events — the data layer that no other independent prediction market API provides.

The infrastructure gap is the opportunity. In traditional finance, backtesting tools are commoditized. In prediction markets, they barely exist. The builders who test strategies rigorously — with the right data at the right granularity — will have a structural advantage over everyone guessing.

Get Started

Everything in this post is runnable against the Assymetrix Data API.

Free tier — Market metadata, current prices, basic search across Polymarket, Kalshi, and Limitless. Enough to explore the API and build your first queries.

Pro tier — Full OHLCV candle history at all resolutions including 15-minute. Historical trade data. The tier that unlocks backtesting.

Business tier — Full on-chain data back to September 2020. Wallet-level analytics. The tier that enables Strategy 3 and any on-chain-dependent research.

The API goes live May 15th. Sign up for Day 1 access at data.assymetrix.com.

This is the second Builder Brief from Assymetrix.

Previous: "We Indexed Every Prediction Market Into One Schema. Here's What We Found."

The Assymetrix Data API provides the deepest independent prediction market dataset available — normalized across Polymarket, Kalshi, and Limitless. Built for builders.

data.assymetrix.com











Other Blog