Data API

Real-time market data, Level 2 orderbook analysis, and institutional-grade market microstructure tools.

Real-time Data Management

Real-time Client

class ProjectXRealtimeClient(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]

Bases: ConnectionManagementMixin, EventHandlingMixin, HealthMonitoringMixin, SubscriptionsMixin, TaskManagerMixin

Async real-time client for ProjectX Gateway API WebSocket connections.

CRITICAL FIXES (v3.3.1): This class now includes comprehensive safety mechanisms to prevent deadlocks, memory leaks, and connection failures in production environments.

This class provides an async interface for ProjectX SignalR connections and forwards all events to registered managers. It does NOT cache data or perform business logic - that’s handled by the specialized managers.

Safety Features (v3.3.1):
  • Task Lifecycle Management: Automatic tracking and cleanup of async tasks

  • Deadlock Prevention: Timeout-based token refresh with state recovery

  • Memory Leak Protection: WeakSet-based task tracking prevents accumulation

  • Connection Recovery: Automatic rollback to stable state on failures

  • Health Monitoring: Comprehensive connection health tracking and automatic recovery

Features:
  • Async SignalR WebSocket connections to ProjectX Gateway hubs

  • Event forwarding to registered async managers

  • Automatic reconnection with exponential backoff

  • JWT token refresh and reconnection with deadlock prevention

  • Real-time connection health monitoring with heartbeat mechanism

  • Latency tracking and performance metrics

  • Automatic reconnection based on health thresholds

  • Async event callbacks

  • Thread-safe event processing and callback execution

  • Comprehensive connection statistics and health tracking

  • NEW: Centralized task management prevents memory leaks

  • NEW: Connection state recovery on failures

  • NEW: Health-based automatic reconnection triggers

Architecture:
  • Pure event forwarding (no business logic)

  • No data caching (handled by managers)

  • No payload parsing (managers handle ProjectX formats)

  • Minimal stateful operations

  • Mixin-based design for modular functionality

  • NEW: TaskManagerMixin provides automatic task cleanup

Real-time Hubs (per ProjectX Gateway docs):
  • User Hub: Account, position, and order updates

  • Market Hub: Quote, trade, and market depth data

Connection Management:
  • Dual-hub SignalR connections with automatic reconnection

  • JWT token authentication via URL query parameter (required by ProjectX Gateway)

  • Real-time health monitoring with heartbeat latency tracking

  • Automatic health-based reconnection when performance degrades

  • Connection error handling and performance metrics

  • Thread-safe operations with proper lock management

  • NEW: Timeout-based operations prevent indefinite blocking

  • NEW: Connection state recovery preserves subscriptions

  • NEW: Health thresholds trigger automatic recovery

Event Processing:
  • Cross-thread event scheduling for asyncio compatibility

  • Support for both async and sync callbacks

  • Error isolation to prevent callback failures

  • Event statistics and flow monitoring

Task Management (v3.3.1):
  • All background tasks are automatically tracked

  • WeakSet-based tracking prevents memory leaks

  • Graceful cancellation with configurable timeouts

  • Error collection and reporting for failed tasks

  • Statistics available via get_task_stats()

Example

>>> # V3.1: Use TradingSuite for automatic real-time management
>>> suite = await TradingSuite.create("MNQ", timeframes=["1min"])
>>> # V3.1: Access real-time client if needed
>>> print(f"Connected: {suite.realtime_client.is_connected()}")
>>>
>>> # V3.3.1: Task management statistics
>>> task_stats = suite.realtime_client.get_task_stats()
>>> print(f"Active tasks: {task_stats['pending_tasks']}")
>>> print(f"Failed tasks: {task_stats['failed_tasks']}")
>>>
>>> # V3.3.1: Health monitoring (NEW)
>>> health_status = await suite.realtime_client.get_health_status()
>>> print(f"Health Score: {health_status['health_score']}/100")
>>> print(f"User Hub Latency: {health_status['user_hub_latency_ms']}ms")
>>> print(f"Market Hub Latency: {health_status['market_hub_latency_ms']}ms")
>>>
>>> # V3.3.1: Configure health monitoring
>>> await suite.realtime_client.configure_health_monitoring(
...     heartbeat_interval=5.0,  # Check every 5 seconds
...     health_threshold=75.0,  # Reconnect if health < 75
...     latency_threshold_ms=1000,  # Alert if latency > 1000ms
... )
>>>
>>> # V3.1: Register callbacks via suite's event bus
>>> from project_x_py import EventType
>>> async def handle_position(event):
...     data = event.data
...     print(f"Position: {data.get('contractId')} - {data.get('netPos')}")
>>> await suite.on(EventType.POSITION_UPDATE, handle_position)
>>>
>>> # V3.3.1: Safe token refresh with deadlock prevention
>>> try:
...     success = await suite.realtime_client.update_jwt_token(
...         new_token, timeout=30.0
...     )
...     if not success:
...         print(
...             "Token refresh failed, connection recovered to original state"
...         )
... except TimeoutError:
...     print("Token refresh timed out, deadlock prevented")
Event Types (per ProjectX Gateway docs):

User Hub: GatewayUserAccount, GatewayUserPosition, GatewayUserOrder, GatewayUserTrade Market Hub: GatewayQuote, GatewayDepth, GatewayTrade

Integration:
  • AsyncPositionManager handles position events and caching

  • AsyncOrderManager handles order events and tracking

  • AsyncRealtimeDataManager handles market data and caching

  • This client only handles connections and event forwarding

Production Reliability (v3.3.1):
  • Zero memory leaks from task accumulation

  • No deadlocks during token refresh operations

  • Automatic recovery from connection failures

  • Comprehensive error handling and logging

  • Performance monitoring through task statistics

__init__(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]

Initialize async ProjectX real-time client with configurable SignalR connections.

Creates a dual-hub SignalR client for real-time ProjectX Gateway communication. Handles both user-specific events (positions, orders) and market data (quotes, trades).

Parameters:
  • jwt_token (str) – JWT authentication token from AsyncProjectX.authenticate(). Must be valid and not expired for successful connection.

  • account_id (str) – ProjectX account ID for user-specific subscriptions. Used to filter position, order, and trade events.

  • user_hub_url (str, optional) – Override URL for user hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).

  • market_hub_url (str, optional) – Override URL for market hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).

  • config (ProjectXConfig, optional) – Configuration object with hub URLs. Provides default URLs if direct URLs not specified. Defaults to None (uses TopStepX defaults).

URL Priority:
  1. Direct parameters (user_hub_url, market_hub_url)

  2. Config URLs (config.user_hub_url, config.market_hub_url)

  3. Default TopStepX endpoints

Example

>>> # V3: Using factory function (recommended)
>>> client = await create_realtime_client(
...     jwt_token=client.get_session_token(),
...     account_id=str(client.get_account_info().id),
... )
>>> # V3: Using direct instantiation with default endpoints
>>> client = ProjectXRealtimeClient(jwt_token=jwt_token, account_id="12345")
>>>
>>> # V3: Using custom config for different environments
>>> from project_x_py.models import ProjectXConfig
>>> config = ProjectXConfig(
...     user_hub_url="https://gateway.topstepx.com/hubs/user",
...     market_hub_url="https://gateway.topstepx.com/hubs/market",
... )
>>> client = ProjectXRealtimeClient(
...     jwt_token=jwt_token, account_id="12345", config=config
... )
>>>
>>> # V3: Override specific URL for testing
>>> client = ProjectXRealtimeClient(
...     jwt_token=jwt_token,
...     account_id="12345",
...     market_hub_url="https://test.topstepx.com/hubs/market",
... )

Note

  • JWT token is passed via URL query parameter (required by ProjectX Gateway)

  • Both hubs must connect successfully for full functionality

  • SignalR connections are established lazily on connect()

Orderbook Analysis

Note

Level 2 OrderBook API: For comprehensive Level 2 orderbook analysis including iceberg detection, market imbalance analysis, and institutional-grade market microstructure tools, see the dedicated OrderBook API module.

class OrderBook(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Bases: OrderBookBase

Async Level 2 Orderbook with comprehensive market analysis.

This class combines all orderbook functionality into a single interface, providing a unified API for accessing real-time market depth data, advanced analytics, detection algorithms, and volume profiling. It uses a component-based architecture where specialized functionality is delegated to dedicated components while maintaining a simple, cohesive interface for the client code.

Key Components:
  • realtime_handler: Manages WebSocket connections and real-time data processing

  • analytics: Provides market analytics (imbalance, depth, delta, liquidity)

  • detection: Implements detection algorithms (iceberg, clusters)

  • profile: Handles volume profiling and support/resistance analysis

  • memory_manager: Manages memory usage and cleanup tasks

Thread Safety:

All methods are thread-safe and can be called concurrently from multiple asyncio tasks. Data consistency is maintained through internal locks.

Memory Management:

The orderbook implements automatic memory management through the MemoryManager component, which periodically cleans up historical data based on configurable parameters to prevent memory leaks during long-running sessions.

Real-time Features:
  • WebSocket-based Level 2 market depth updates

  • Immediate trade execution detection and classification

  • Real-time spread and price level monitoring

  • Event-driven callback system for custom logic

  • Automatic data validation and error handling

Analytics Capabilities:
  • Market imbalance analysis and ratio calculations

  • Orderbook depth analysis within price ranges

  • Cumulative delta tracking and trade flow statistics

  • Liquidity level identification and concentration analysis

  • Comprehensive orderbook statistics and health metrics

Detection Algorithms:
  • Iceberg order detection with confidence scoring

  • Order clustering analysis for institutional activity

  • Advanced market microstructure metrics

  • Hidden liquidity and volume pattern recognition

Example

>>> # V3.1: Using TradingSuite's integrated orderbook
>>> suite = await TradingSuite.create("MNQ", features=["orderbook"])
>>>
>>> # V3.1: Register event handlers via suite's EventBus
>>> @suite.events.on(EventType.MARKET_DEPTH_UPDATE)
>>> async def handle_depth(event):
...     data = event.data
...     print(f"Depth: {data['bids'][0]['price']} @ {data['bids'][0]['size']}")
>>>
>>> # Get basic orderbook data
>>> snapshot = await suite.orderbook.get_orderbook_snapshot()
>>> print(f"Spread: {snapshot['spread']}")
>>>
>>> # Advanced analytics
>>> imbalance = await suite.orderbook.get_market_imbalance()
>>> liquidity = await suite.orderbook.get_liquidity_levels()
>>>
>>> # Detection algorithms
>>> icebergs = await suite.orderbook.detect_iceberg_orders()
>>> clusters = await suite.orderbook.detect_order_clusters()
>>>
>>> # Volume profiling
>>> profile = await suite.orderbook.get_volume_profile()
>>> support_resistance = await suite.orderbook.get_support_resistance_levels()
>>>
>>> # Cleanup handled automatically
>>> await suite.disconnect()
__init__(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]

Initialize the orderbook.

Parameters:
  • instrument (str) – Trading instrument symbol

  • event_bus (Any) – EventBus instance for unified event handling. Required for all event emissions including market depth updates and trade ticks.

  • project_x (ProjectXBase | None) – Optional ProjectX client for tick size lookup

  • timezone_str (str) – Timezone for timestamps (default: America/Chicago)

  • config (OrderbookConfig | None) – Optional configuration for orderbook behavior

async initialize(realtime_client=None, subscribe_to_depth=True, subscribe_to_quotes=True)[source]

Initialize the orderbook with optional real-time data feed.

This method configures the orderbook for operation, sets up the memory manager, and optionally connects to the real-time data feed. It must be called after creating an OrderBook instance and before using any other methods.

The initialization process performs the following steps: 1. Starts the memory manager for automatic cleanup 2. If a realtime_client is provided:

  • Registers callbacks for market depth and quote updates

  • Subscribes to the specified data channels

  • Sets up WebSocket connection handlers

Parameters:
  • realtime_client (ProjectXRealtimeClient | None) – Async real-time client for WebSocket data. If provided, the orderbook will receive live market data updates. If None, the orderbook will function in historical/static mode only.

  • subscribe_to_depth (bool) – Subscribe to market depth updates (Level 2 data). Set to False only if you don’t need full order book data.

  • subscribe_to_quotes (bool) – Subscribe to quote updates (top of book data). Set to False only if you don’t need quote data.

Returns:

True if initialization successful, False if any part of the

initialization failed.

Return type:

bool

Example

>>> # V3.1: Initialization handled by TradingSuite
>>> suite = await TradingSuite.create("MNQ", features=["orderbook"])
>>> # Orderbook is automatically initialized with real-time connection
>>> if suite.orderbook:
...     print("Orderbook initialized and receiving real-time data")
... else:
...     print("Orderbook not available")
async get_market_imbalance(levels=10)[source]

Calculate order flow imbalance between bid and ask sides.

Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.

Return type:

LiquidityAnalysisResponse

async get_orderbook_depth(price_range)[source]

Analyze orderbook depth within a price range.

Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.

Return type:

MarketImpactResponse

async get_cumulative_delta(time_window_minutes=60)[source]

Get cumulative delta (buy volume - sell volume) over time window.

Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.

Return type:

dict[str, Any]

async get_trade_flow_summary()[source]

Get comprehensive trade flow statistics.

Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.

Return type:

dict[str, Any]

async get_liquidity_levels(min_volume=100, levels=20)[source]

Identify significant liquidity levels in the orderbook.

Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.

Return type:

dict[str, Any]

async get_statistics()[source]

Get comprehensive orderbook statistics.

Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.

Return type:

dict[str, Any]

async detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]

Detect potential iceberg orders based on price level refresh patterns.

Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.

Return type:

dict[str, Any]

async detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]

Detect clusters of orders at similar price levels.

Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.

Return type:

list[dict[str, Any]]

async get_advanced_market_metrics()[source]

Calculate advanced market microstructure metrics.

Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.

Return type:

OrderbookAnalysisResponse

async detect_spoofing(time_window_minutes=10, min_placement_frequency=3.0, min_cancellation_rate=0.8, max_time_to_cancel=30.0, min_distance_ticks=3, confidence_threshold=0.7)[source]

Detect potential spoofing patterns in order book behavior.

Delegates to OrderDetection.detect_spoofing(). See OrderDetection.detect_spoofing() for complete documentation.

Parameters:
  • time_window_minutes (int) – Time window for analysis (default: 10 minutes)

  • min_placement_frequency (float) – Minimum order placements per minute to consider

  • min_cancellation_rate (float) – Minimum cancellation rate (0.0-1.0) to flag

  • max_time_to_cancel (float) – Maximum average time to cancellation (seconds)

  • min_distance_ticks (int) – Minimum distance from best bid/ask in ticks

  • confidence_threshold (float) – Minimum confidence score to include in results

Return type:

list[SpoofingDetectionResponse]

Returns:

List of SpoofingDetectionResponse objects with detected patterns

Example

>>> # Using TradingSuite with orderbook
>>> suite = await TradingSuite.create("MNQ", features=["orderbook"])
>>> spoofing = await suite.orderbook.detect_spoofing()
>>> for detection in spoofing:
...     print(
...         f"Spoofing: {detection['pattern']} at {detection['price']:.2f}"
...     )
async get_volume_profile(time_window_minutes=60, price_bins=20)[source]

Calculate volume profile showing volume distribution by price.

Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.

Return type:

dict[str, Any]

async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]

Identify support and resistance levels based on price history.

Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.

Return type:

dict[str, Any]

async get_spread_analysis(window_minutes=30)[source]

Analyze bid-ask spread patterns over time.

Delegates to VolumeProfile.get_spread_analysis(). See VolumeProfile.get_spread_analysis() for complete documentation.

Return type:

LiquidityAnalysisResponse

get_memory_stats()[source]

Get comprehensive memory usage statistics.

Delegates to MemoryManager.get_memory_stats(). See MemoryManager.get_memory_stats() for complete documentation.

Return type:

dict[str, Any]

async cleanup()[source]

Clean up resources and disconnect from real-time feeds.

Return type:

None

Data Factory Functions

create_orderbook(instrument, event_bus, project_x=None, realtime_client=None, timezone_str='America/Chicago')[source]

Factory function to create an orderbook.

This factory function creates and returns an OrderBook instance for the specified instrument. It simplifies the process of creating an orderbook by handling the initial configuration. Note that the returned orderbook is not yet initialized - you must call the initialize() method separately to start the orderbook’s functionality.

The factory approach provides several benefits: 1. Ensures consistent orderbook creation across the application 2. Allows for future extension with pre-configured orderbook variants 3. Simplifies the API for common use cases

Parameters:
  • instrument (str) – Trading instrument symbol (e.g., “ES”, “NQ”, “MES”, “MNQ”). This should be the base symbol without contract-specific extensions.

  • project_x (ProjectXBase | None) – Optional AsyncProjectX client for tick size lookup and API access. If provided, the orderbook will be able to look up tick sizes and other contract details automatically.

  • realtime_client (ProjectXRealtimeClient | None) – Optional real-time client for WebSocket data. This is kept for compatibility but should be passed to initialize() instead.

  • timezone_str (str) – Timezone for timestamps (default: “America/Chicago”). All timestamps in the orderbook will be converted to this timezone.

Returns:

Orderbook instance that must be initialized with a call to initialize() before use.

Return type:

OrderBook

Example

>>> # V3.1: Use TradingSuite instead of factory function
>>> # This function is deprecated - use TradingSuite.create()
>>> suite = await TradingSuite.create(
...     instrument="MNQ",
...     features=["orderbook"],
...     timezone_str="America/Chicago",  # CME timezone
... )
>>>
>>> # Orderbook is automatically initialized
>>> # Access via suite.orderbook
>>> snapshot = await suite.orderbook.get_orderbook_snapshot()
>>>
>>> # Note: create_orderbook is maintained for backward compatibility
>>> # but TradingSuite is the recommended approach

Instrument Models

class Instrument(id, name, description, tickSize, tickValue, activeContract, symbolId=None)[source]

Bases: object

Represents a tradeable financial instrument/contract.

id

Unique contract identifier used in API calls

Type:

str

name

Contract name/symbol (e.g., “MNQU25”, “ESH25”)

Type:

str

description

Human-readable description of the contract

Type:

str

tickSize

Minimum price movement (e.g., 0.1)

Type:

float

tickValue

Dollar value per tick movement

Type:

float

activeContract

Whether the contract is currently active for trading

Type:

bool

Example

>>> print(f"Trading {instrument.name}")
>>> print(
...     f"Tick size: ${instrument.tickSize}, Tick value: ${instrument.tickValue}"
... )
id: str
name: str
description: str
tickSize: float
tickValue: float
activeContract: bool
symbolId: str | None = None
__init__(id, name, description, tickSize, tickValue, activeContract, symbolId=None)

Note

Technical Indicators: For comprehensive technical analysis with 55+ indicators, see the Technical Indicators module which provides TA-Lib compatible indicators optimized for Polars DataFrames.

Data Utilities

create_data_snapshot(data, description='')[source]

Create a comprehensive snapshot of DataFrame for debugging/analysis.

Parameters:
  • data (DataFrame) – Polars DataFrame

  • description (str) – Optional description

Returns:

Data snapshot with statistics

Return type:

dict

Example

>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data")
>>> print(f"Rows: {snapshot['row_count']}")
>>> print(f"Timespan: {snapshot['timespan']}")
convert_timeframe_to_seconds(timeframe)[source]

Convert timeframe string to seconds.

Parameters:

timeframe (str) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)

Returns:

Timeframe in seconds

Return type:

int

Example

>>> convert_timeframe_to_seconds("5min")
300
>>> convert_timeframe_to_seconds("1hr")
3600
get_market_session_info(timezone='America/Chicago')[source]

Get detailed market session information.

Parameters:

timezone (str) – Market timezone

Returns:

Market session details

Return type:

dict

Example

>>> info = get_market_session_info()
>>> print(f"Market open: {info['is_open']}")
>>> print(f"Next session: {info['next_session_start']}")
is_market_hours(timezone='America/Chicago')[source]

Check if it’s currently market hours (CME futures).

Parameters:

timezone (str) – Timezone to check (default: CME time)

Returns:

True if market is open

Return type:

bool