Data API
Real-time market data, Level 2 orderbook analysis, and institutional-grade market microstructure tools.
Real-time Data Management
Real-time Client
- class ProjectXRealtimeClient(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]
Bases:
ConnectionManagementMixin,EventHandlingMixin,HealthMonitoringMixin,SubscriptionsMixin,TaskManagerMixinAsync real-time client for ProjectX Gateway API WebSocket connections.
CRITICAL FIXES (v3.3.1): This class now includes comprehensive safety mechanisms to prevent deadlocks, memory leaks, and connection failures in production environments.
This class provides an async interface for ProjectX SignalR connections and forwards all events to registered managers. It does NOT cache data or perform business logic - that’s handled by the specialized managers.
- Safety Features (v3.3.1):
Task Lifecycle Management: Automatic tracking and cleanup of async tasks
Deadlock Prevention: Timeout-based token refresh with state recovery
Memory Leak Protection: WeakSet-based task tracking prevents accumulation
Connection Recovery: Automatic rollback to stable state on failures
Health Monitoring: Comprehensive connection health tracking and automatic recovery
- Features:
Async SignalR WebSocket connections to ProjectX Gateway hubs
Event forwarding to registered async managers
Automatic reconnection with exponential backoff
JWT token refresh and reconnection with deadlock prevention
Real-time connection health monitoring with heartbeat mechanism
Latency tracking and performance metrics
Automatic reconnection based on health thresholds
Async event callbacks
Thread-safe event processing and callback execution
Comprehensive connection statistics and health tracking
NEW: Centralized task management prevents memory leaks
NEW: Connection state recovery on failures
NEW: Health-based automatic reconnection triggers
- Architecture:
Pure event forwarding (no business logic)
No data caching (handled by managers)
No payload parsing (managers handle ProjectX formats)
Minimal stateful operations
Mixin-based design for modular functionality
NEW: TaskManagerMixin provides automatic task cleanup
- Real-time Hubs (per ProjectX Gateway docs):
User Hub: Account, position, and order updates
Market Hub: Quote, trade, and market depth data
- Connection Management:
Dual-hub SignalR connections with automatic reconnection
JWT token authentication via URL query parameter (required by ProjectX Gateway)
Real-time health monitoring with heartbeat latency tracking
Automatic health-based reconnection when performance degrades
Connection error handling and performance metrics
Thread-safe operations with proper lock management
NEW: Timeout-based operations prevent indefinite blocking
NEW: Connection state recovery preserves subscriptions
NEW: Health thresholds trigger automatic recovery
- Event Processing:
Cross-thread event scheduling for asyncio compatibility
Support for both async and sync callbacks
Error isolation to prevent callback failures
Event statistics and flow monitoring
- Task Management (v3.3.1):
All background tasks are automatically tracked
WeakSet-based tracking prevents memory leaks
Graceful cancellation with configurable timeouts
Error collection and reporting for failed tasks
Statistics available via get_task_stats()
Example
>>> # V3.1: Use TradingSuite for automatic real-time management >>> suite = await TradingSuite.create("MNQ", timeframes=["1min"]) >>> # V3.1: Access real-time client if needed >>> print(f"Connected: {suite.realtime_client.is_connected()}") >>> >>> # V3.3.1: Task management statistics >>> task_stats = suite.realtime_client.get_task_stats() >>> print(f"Active tasks: {task_stats['pending_tasks']}") >>> print(f"Failed tasks: {task_stats['failed_tasks']}") >>> >>> # V3.3.1: Health monitoring (NEW) >>> health_status = await suite.realtime_client.get_health_status() >>> print(f"Health Score: {health_status['health_score']}/100") >>> print(f"User Hub Latency: {health_status['user_hub_latency_ms']}ms") >>> print(f"Market Hub Latency: {health_status['market_hub_latency_ms']}ms") >>> >>> # V3.3.1: Configure health monitoring >>> await suite.realtime_client.configure_health_monitoring( ... heartbeat_interval=5.0, # Check every 5 seconds ... health_threshold=75.0, # Reconnect if health < 75 ... latency_threshold_ms=1000, # Alert if latency > 1000ms ... ) >>> >>> # V3.1: Register callbacks via suite's event bus >>> from project_x_py import EventType >>> async def handle_position(event): ... data = event.data ... print(f"Position: {data.get('contractId')} - {data.get('netPos')}") >>> await suite.on(EventType.POSITION_UPDATE, handle_position) >>> >>> # V3.3.1: Safe token refresh with deadlock prevention >>> try: ... success = await suite.realtime_client.update_jwt_token( ... new_token, timeout=30.0 ... ) ... if not success: ... print( ... "Token refresh failed, connection recovered to original state" ... ) ... except TimeoutError: ... print("Token refresh timed out, deadlock prevented")
- Event Types (per ProjectX Gateway docs):
User Hub: GatewayUserAccount, GatewayUserPosition, GatewayUserOrder, GatewayUserTrade Market Hub: GatewayQuote, GatewayDepth, GatewayTrade
- Integration:
AsyncPositionManager handles position events and caching
AsyncOrderManager handles order events and tracking
AsyncRealtimeDataManager handles market data and caching
This client only handles connections and event forwarding
- Production Reliability (v3.3.1):
Zero memory leaks from task accumulation
No deadlocks during token refresh operations
Automatic recovery from connection failures
Comprehensive error handling and logging
Performance monitoring through task statistics
- __init__(jwt_token, account_id, user_hub_url=None, market_hub_url=None, config=None)[source]
Initialize async ProjectX real-time client with configurable SignalR connections.
Creates a dual-hub SignalR client for real-time ProjectX Gateway communication. Handles both user-specific events (positions, orders) and market data (quotes, trades).
- Parameters:
jwt_token (str) – JWT authentication token from AsyncProjectX.authenticate(). Must be valid and not expired for successful connection.
account_id (str) – ProjectX account ID for user-specific subscriptions. Used to filter position, order, and trade events.
user_hub_url (str, optional) – Override URL for user hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).
market_hub_url (str, optional) – Override URL for market hub endpoint. If provided, takes precedence over config URL. Defaults to None (uses config or default).
config (ProjectXConfig, optional) – Configuration object with hub URLs. Provides default URLs if direct URLs not specified. Defaults to None (uses TopStepX defaults).
- URL Priority:
Direct parameters (user_hub_url, market_hub_url)
Config URLs (config.user_hub_url, config.market_hub_url)
Default TopStepX endpoints
Example
>>> # V3: Using factory function (recommended) >>> client = await create_realtime_client( ... jwt_token=client.get_session_token(), ... account_id=str(client.get_account_info().id), ... ) >>> # V3: Using direct instantiation with default endpoints >>> client = ProjectXRealtimeClient(jwt_token=jwt_token, account_id="12345") >>> >>> # V3: Using custom config for different environments >>> from project_x_py.models import ProjectXConfig >>> config = ProjectXConfig( ... user_hub_url="https://gateway.topstepx.com/hubs/user", ... market_hub_url="https://gateway.topstepx.com/hubs/market", ... ) >>> client = ProjectXRealtimeClient( ... jwt_token=jwt_token, account_id="12345", config=config ... ) >>> >>> # V3: Override specific URL for testing >>> client = ProjectXRealtimeClient( ... jwt_token=jwt_token, ... account_id="12345", ... market_hub_url="https://test.topstepx.com/hubs/market", ... )
Note
JWT token is passed via URL query parameter (required by ProjectX Gateway)
Both hubs must connect successfully for full functionality
SignalR connections are established lazily on connect()
Orderbook Analysis
Note
Level 2 OrderBook API: For comprehensive Level 2 orderbook analysis including iceberg detection, market imbalance analysis, and institutional-grade market microstructure tools, see the dedicated OrderBook API module.
- class OrderBook(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]
Bases:
OrderBookBaseAsync Level 2 Orderbook with comprehensive market analysis.
This class combines all orderbook functionality into a single interface, providing a unified API for accessing real-time market depth data, advanced analytics, detection algorithms, and volume profiling. It uses a component-based architecture where specialized functionality is delegated to dedicated components while maintaining a simple, cohesive interface for the client code.
- Key Components:
realtime_handler: Manages WebSocket connections and real-time data processing
analytics: Provides market analytics (imbalance, depth, delta, liquidity)
detection: Implements detection algorithms (iceberg, clusters)
profile: Handles volume profiling and support/resistance analysis
memory_manager: Manages memory usage and cleanup tasks
- Thread Safety:
All methods are thread-safe and can be called concurrently from multiple asyncio tasks. Data consistency is maintained through internal locks.
- Memory Management:
The orderbook implements automatic memory management through the MemoryManager component, which periodically cleans up historical data based on configurable parameters to prevent memory leaks during long-running sessions.
- Real-time Features:
WebSocket-based Level 2 market depth updates
Immediate trade execution detection and classification
Real-time spread and price level monitoring
Event-driven callback system for custom logic
Automatic data validation and error handling
- Analytics Capabilities:
Market imbalance analysis and ratio calculations
Orderbook depth analysis within price ranges
Cumulative delta tracking and trade flow statistics
Liquidity level identification and concentration analysis
Comprehensive orderbook statistics and health metrics
- Detection Algorithms:
Iceberg order detection with confidence scoring
Order clustering analysis for institutional activity
Advanced market microstructure metrics
Hidden liquidity and volume pattern recognition
Example
>>> # V3.1: Using TradingSuite's integrated orderbook >>> suite = await TradingSuite.create("MNQ", features=["orderbook"]) >>> >>> # V3.1: Register event handlers via suite's EventBus >>> @suite.events.on(EventType.MARKET_DEPTH_UPDATE) >>> async def handle_depth(event): ... data = event.data ... print(f"Depth: {data['bids'][0]['price']} @ {data['bids'][0]['size']}") >>> >>> # Get basic orderbook data >>> snapshot = await suite.orderbook.get_orderbook_snapshot() >>> print(f"Spread: {snapshot['spread']}") >>> >>> # Advanced analytics >>> imbalance = await suite.orderbook.get_market_imbalance() >>> liquidity = await suite.orderbook.get_liquidity_levels() >>> >>> # Detection algorithms >>> icebergs = await suite.orderbook.detect_iceberg_orders() >>> clusters = await suite.orderbook.detect_order_clusters() >>> >>> # Volume profiling >>> profile = await suite.orderbook.get_volume_profile() >>> support_resistance = await suite.orderbook.get_support_resistance_levels() >>> >>> # Cleanup handled automatically >>> await suite.disconnect()
- __init__(instrument, event_bus, project_x=None, timezone_str='America/Chicago', config=None)[source]
Initialize the orderbook.
- Parameters:
instrument (
str) – Trading instrument symbolevent_bus (
Any) – EventBus instance for unified event handling. Required for all event emissions including market depth updates and trade ticks.project_x (
ProjectXBase|None) – Optional ProjectX client for tick size lookuptimezone_str (
str) – Timezone for timestamps (default: America/Chicago)config (
OrderbookConfig|None) – Optional configuration for orderbook behavior
- async initialize(realtime_client=None, subscribe_to_depth=True, subscribe_to_quotes=True)[source]
Initialize the orderbook with optional real-time data feed.
This method configures the orderbook for operation, sets up the memory manager, and optionally connects to the real-time data feed. It must be called after creating an OrderBook instance and before using any other methods.
The initialization process performs the following steps: 1. Starts the memory manager for automatic cleanup 2. If a realtime_client is provided:
Registers callbacks for market depth and quote updates
Subscribes to the specified data channels
Sets up WebSocket connection handlers
- Parameters:
realtime_client (
ProjectXRealtimeClient|None) – Async real-time client for WebSocket data. If provided, the orderbook will receive live market data updates. If None, the orderbook will function in historical/static mode only.subscribe_to_depth (
bool) – Subscribe to market depth updates (Level 2 data). Set to False only if you don’t need full order book data.subscribe_to_quotes (
bool) – Subscribe to quote updates (top of book data). Set to False only if you don’t need quote data.
- Returns:
- True if initialization successful, False if any part of the
initialization failed.
- Return type:
Example
>>> # V3.1: Initialization handled by TradingSuite >>> suite = await TradingSuite.create("MNQ", features=["orderbook"]) >>> # Orderbook is automatically initialized with real-time connection >>> if suite.orderbook: ... print("Orderbook initialized and receiving real-time data") ... else: ... print("Orderbook not available")
- async get_market_imbalance(levels=10)[source]
Calculate order flow imbalance between bid and ask sides.
Delegates to MarketAnalytics.get_market_imbalance(). See MarketAnalytics.get_market_imbalance() for complete documentation.
- Return type:
LiquidityAnalysisResponse
- async get_orderbook_depth(price_range)[source]
Analyze orderbook depth within a price range.
Delegates to MarketAnalytics.get_orderbook_depth(). See MarketAnalytics.get_orderbook_depth() for complete documentation.
- Return type:
MarketImpactResponse
- async get_cumulative_delta(time_window_minutes=60)[source]
Get cumulative delta (buy volume - sell volume) over time window.
Delegates to MarketAnalytics.get_cumulative_delta(). See MarketAnalytics.get_cumulative_delta() for complete documentation.
- async get_trade_flow_summary()[source]
Get comprehensive trade flow statistics.
Delegates to MarketAnalytics.get_trade_flow_summary(). See MarketAnalytics.get_trade_flow_summary() for complete documentation.
- async get_liquidity_levels(min_volume=100, levels=20)[source]
Identify significant liquidity levels in the orderbook.
Delegates to MarketAnalytics.get_liquidity_levels(). See MarketAnalytics.get_liquidity_levels() for complete documentation.
- async get_statistics()[source]
Get comprehensive orderbook statistics.
Delegates to MarketAnalytics.get_statistics(). See MarketAnalytics.get_statistics() for complete documentation.
- async detect_iceberg_orders(min_refreshes=None, volume_threshold=None, time_window_minutes=None)[source]
Detect potential iceberg orders based on price level refresh patterns.
Delegates to OrderDetection.detect_iceberg_orders(). See OrderDetection.detect_iceberg_orders() for complete documentation.
- async detect_order_clusters(min_cluster_size=3, price_tolerance=0.1)[source]
Detect clusters of orders at similar price levels.
Delegates to OrderDetection.detect_order_clusters(). See OrderDetection.detect_order_clusters() for complete documentation.
- async get_advanced_market_metrics()[source]
Calculate advanced market microstructure metrics.
Delegates to OrderDetection.get_advanced_market_metrics(). See OrderDetection.get_advanced_market_metrics() for complete documentation.
- Return type:
OrderbookAnalysisResponse
- async detect_spoofing(time_window_minutes=10, min_placement_frequency=3.0, min_cancellation_rate=0.8, max_time_to_cancel=30.0, min_distance_ticks=3, confidence_threshold=0.7)[source]
Detect potential spoofing patterns in order book behavior.
Delegates to OrderDetection.detect_spoofing(). See OrderDetection.detect_spoofing() for complete documentation.
- Parameters:
time_window_minutes (
int) – Time window for analysis (default: 10 minutes)min_placement_frequency (
float) – Minimum order placements per minute to considermin_cancellation_rate (
float) – Minimum cancellation rate (0.0-1.0) to flagmax_time_to_cancel (
float) – Maximum average time to cancellation (seconds)min_distance_ticks (
int) – Minimum distance from best bid/ask in ticksconfidence_threshold (
float) – Minimum confidence score to include in results
- Return type:
list[SpoofingDetectionResponse]- Returns:
List of SpoofingDetectionResponse objects with detected patterns
Example
>>> # Using TradingSuite with orderbook >>> suite = await TradingSuite.create("MNQ", features=["orderbook"]) >>> spoofing = await suite.orderbook.detect_spoofing() >>> for detection in spoofing: ... print( ... f"Spoofing: {detection['pattern']} at {detection['price']:.2f}" ... )
- async get_volume_profile(time_window_minutes=60, price_bins=20)[source]
Calculate volume profile showing volume distribution by price.
Delegates to VolumeProfile.get_volume_profile(). See VolumeProfile.get_volume_profile() for complete documentation.
- async get_support_resistance_levels(lookback_minutes=120, min_touches=3, price_tolerance=0.1)[source]
Identify support and resistance levels based on price history.
Delegates to VolumeProfile.get_support_resistance_levels(). See VolumeProfile.get_support_resistance_levels() for complete documentation.
- async get_spread_analysis(window_minutes=30)[source]
Analyze bid-ask spread patterns over time.
Delegates to VolumeProfile.get_spread_analysis(). See VolumeProfile.get_spread_analysis() for complete documentation.
- Return type:
LiquidityAnalysisResponse
Data Factory Functions
- create_orderbook(instrument, event_bus, project_x=None, realtime_client=None, timezone_str='America/Chicago')[source]
Factory function to create an orderbook.
This factory function creates and returns an OrderBook instance for the specified instrument. It simplifies the process of creating an orderbook by handling the initial configuration. Note that the returned orderbook is not yet initialized - you must call the initialize() method separately to start the orderbook’s functionality.
The factory approach provides several benefits: 1. Ensures consistent orderbook creation across the application 2. Allows for future extension with pre-configured orderbook variants 3. Simplifies the API for common use cases
- Parameters:
instrument (
str) – Trading instrument symbol (e.g., “ES”, “NQ”, “MES”, “MNQ”). This should be the base symbol without contract-specific extensions.project_x (
ProjectXBase|None) – Optional AsyncProjectX client for tick size lookup and API access. If provided, the orderbook will be able to look up tick sizes and other contract details automatically.realtime_client (
ProjectXRealtimeClient|None) – Optional real-time client for WebSocket data. This is kept for compatibility but should be passed to initialize() instead.timezone_str (
str) – Timezone for timestamps (default: “America/Chicago”). All timestamps in the orderbook will be converted to this timezone.
- Returns:
Orderbook instance that must be initialized with a call to initialize() before use.
- Return type:
Example
>>> # V3.1: Use TradingSuite instead of factory function >>> # This function is deprecated - use TradingSuite.create() >>> suite = await TradingSuite.create( ... instrument="MNQ", ... features=["orderbook"], ... timezone_str="America/Chicago", # CME timezone ... ) >>> >>> # Orderbook is automatically initialized >>> # Access via suite.orderbook >>> snapshot = await suite.orderbook.get_orderbook_snapshot() >>> >>> # Note: create_orderbook is maintained for backward compatibility >>> # but TradingSuite is the recommended approach
Instrument Models
- class Instrument(id, name, description, tickSize, tickValue, activeContract, symbolId=None)[source]
Bases:
objectRepresents a tradeable financial instrument/contract.
Example
>>> print(f"Trading {instrument.name}") >>> print( ... f"Tick size: ${instrument.tickSize}, Tick value: ${instrument.tickValue}" ... )
- __init__(id, name, description, tickSize, tickValue, activeContract, symbolId=None)
Note
Technical Indicators: For comprehensive technical analysis with 55+ indicators, see the Technical Indicators module which provides TA-Lib compatible indicators optimized for Polars DataFrames.
Data Utilities
- create_data_snapshot(data, description='')[source]
Create a comprehensive snapshot of DataFrame for debugging/analysis.
- Parameters:
data (
DataFrame) – Polars DataFramedescription (
str) – Optional description
- Returns:
Data snapshot with statistics
- Return type:
Example
>>> snapshot = create_data_snapshot(ohlcv_data, "MGC 5min data") >>> print(f"Rows: {snapshot['row_count']}") >>> print(f"Timespan: {snapshot['timespan']}")
- convert_timeframe_to_seconds(timeframe)[source]
Convert timeframe string to seconds.
- Parameters:
timeframe (
str) – Timeframe (e.g., “1min”, “5min”, “1hr”, “1day”)- Returns:
Timeframe in seconds
- Return type:
Example
>>> convert_timeframe_to_seconds("5min") 300 >>> convert_timeframe_to_seconds("1hr") 3600