Statistics & Analytics API
Comprehensive async-first statistics system with health monitoring and performance tracking (v3.3.0+).
Overview
The statistics system provides centralized collection and analysis of performance metrics across all SDK components. Features include:
100% Async Architecture: All statistics methods use async/await for optimal performance
Multi-format Export: JSON, Prometheus, CSV, and Datadog formats with data sanitization
Component-Specific Tracking: Enhanced statistics for all managers with specialized metrics
Health Monitoring: Intelligent 0-100 health scoring with configurable thresholds
Performance Optimization: TTL caching, parallel collection, and circular buffers
Memory Efficiency: Circular buffers and lock-free reads for frequently accessed metrics
Core Components
Statistics Aggregator
- class StatisticsAggregator(cache_ttl=5.0, component_timeout=1.0)[source]
Bases:
BaseStatisticsTrackerCentralized statistics aggregation for all ProjectX SDK components.
Provides parallel collection from registered components, cross-component metrics calculation, health score aggregation, and performance optimization through TTL caching. Handles component failures gracefully with timeout protection and partial result recovery.
- Features:
Async component registration and management
Parallel statistics collection using asyncio.gather()
Cross-component metrics calculation (total errors, combined P&L)
Health score aggregation with weighted averages
TTL caching for expensive operations (5-second default)
Timeout protection (1 second per component)
Graceful error handling with partial results
Type-safe statistics using ComprehensiveStats
- Performance Optimizations:
Parallel collection reduces total time to max component time
TTL caching prevents redundant calculations within cache window
Timeout protection prevents hanging on failed components
Memory-efficient handling of large statistics datasets
- async register_component(name, component)[source]
Register a component for statistics collection.
Components should implement at least one of: get_stats(), get_statistics(), get_memory_stats(), or get_health_score() methods. The aggregator will automatically detect which methods are available and use them appropriately.
- Parameters:
- Raises:
ValueError – If component name is already registered
- Return type:
- async get_comprehensive_stats()[source]
Get comprehensive statistics from all registered components.
Collects statistics from all components in parallel using asyncio.gather(), calculates cross-component metrics, and aggregates health scores. Uses TTL caching to optimize performance for repeated calls within the cache window.
- Return type:
- Returns:
ComprehensiveStats with suite, component, connection, and performance data
- Performance:
Parallel collection reduces total time to max component time
TTL caching prevents redundant expensive operations
Timeout protection ensures responsiveness even with failed components
- async get_suite_stats()[source]
Get TradingSuite-level statistics with cross-component metrics.
Provides suite-level view of the system including component status, cross-component metrics, and overall health scoring. Optimized for frequent polling with TTL caching and efficient component collection.
- Return type:
- Returns:
TradingSuiteStats with suite-level metrics and component summary
- Performance:
Lighter weight than comprehensive stats collection
Focuses on suite-level metrics and cross-component calculations
TTL caching for frequent polling scenarios
- async get_component_count()[source]
Get number of registered components.
- Return type:
- Returns:
Number of components currently registered
- async clear_all_components()[source]
Remove all registered components.
Useful for cleanup or testing scenarios.
- Return type:
- async aggregate_stats(force_refresh=False)[source]
Compatibility method for TradingSuite integration.
This method provides backward compatibility with the old StatisticsAggregator interface used by TradingSuite. New code should use get_suite_stats().
- Parameters:
force_refresh (
bool) – Force refresh bypassing cache- Returns:
Aggregated statistics from all components
- Return type:
Health Monitor
- class HealthMonitor(thresholds=None, weights=None)[source]
Bases:
objectComprehensive health monitoring with intelligent scoring algorithms.
Evaluates system health across multiple dimensions including error rates, performance metrics, connection stability, resource usage, and data quality. Provides actionable insights with configurable thresholds and alert levels.
- Features:
Multi-factor health scoring with weighted categories
Configurable thresholds for different environments
Smooth scoring transitions to prevent alert flapping
Graceful handling of missing statistics
Performance-optimized async calculations
Actionable alerts with specific recommendations
- __init__(thresholds=None, weights=None)[source]
Initialize the health monitor with configurable thresholds and weights.
- async calculate_health(stats)[source]
Calculate overall health score based on comprehensive statistics.
- Parameters:
stats (
ComprehensiveStats) – Comprehensive statistics from all components- Return type:
- Returns:
Health score between 0-100 (100 = perfect health)
- async get_health_breakdown(stats)[source]
Get detailed breakdown of health scores by category.
- Parameters:
stats (
ComprehensiveStats) – Comprehensive statistics from all components- Return type:
HealthBreakdown- Returns:
Detailed health breakdown with scores for each category
- async get_health_alerts(stats)[source]
Generate health alerts based on current statistics.
- Parameters:
stats (
ComprehensiveStats) – Comprehensive statistics from all components- Return type:
list[HealthAlert]- Returns:
List of health alerts with severity levels and recommendations
Base Statistics Tracker
- class BaseStatisticsTracker(component_name, max_errors=100, cache_ttl=5.0)[source]
Bases:
objectBase class for async statistics tracking with thread safety and caching.
Provides foundational statistics tracking capabilities including counters, gauges, timing data, error tracking, and health scoring. All operations are async-safe using asyncio.Lock and include TTL caching for expensive operations.
- Features:
Async-safe counters and gauges using asyncio.Lock
Efficient memory tracking with caching
Error history with circular buffer (maxlen=100)
Performance timing tracking
Health scoring algorithm (0-100 scale)
TTL cache for expensive operations (5-second default)
Single lock per component to prevent deadlocks
- async track_error(error, context, details=None)[source]
Track an error occurrence with context and details.
- async get_memory_usage()[source]
Get estimated memory usage in MB.
Override in subclasses for component-specific memory calculations.
- Return type:
- async get_health_score()[source]
Calculate health score for this component (0-100 scale).
Health scoring factors: - Error rate (40% weight): Lower error rate = higher score - Uptime (20% weight): Longer uptime = higher score - Activity (20% weight): Recent activity = higher score - Status (20% weight): Connected status = higher score
- Return type:
- Returns:
Health score between 0-100 (100 = perfect health)
Statistics Collector
TradingSuite Statistics
Getting Statistics
from project_x_py import TradingSuite
async def get_comprehensive_statistics():
suite = await TradingSuite.create("MNQ")
# Get comprehensive system statistics (async-first API)
stats = await suite.get_stats()
# Health scoring (0-100) with intelligent monitoring
print(f"System Health: {stats['health_score']:.1f}/100")
# Performance metrics with enhanced tracking
print(f"API Calls: {stats['total_api_calls']}")
print(f"Success Rate: {stats['api_success_rate']:.1%}")
print(f"Memory Usage: {stats['memory_usage_mb']:.1f} MB")
await suite.disconnect()
Component-Specific Statistics
async def component_statistics():
suite = await TradingSuite.create("MNQ")
# Component-specific statistics (all async for consistency)
order_stats = await suite.orders.get_stats()
print(f"Fill Rate: {order_stats['fill_rate']:.1%}")
print(f"Average Fill Time: {order_stats['avg_fill_time_ms']:.0f}ms")
position_stats = await suite.positions.get_stats()
print(f"Win Rate: {position_stats.get('win_rate', 0):.1%}")
# OrderBook statistics (if enabled)
if suite.orderbook:
orderbook_stats = await suite.orderbook.get_stats()
print(f"Depth Updates: {orderbook_stats['depth_updates']}")
await suite.disconnect()
Export Capabilities
Multi-format Export
async def export_statistics():
suite = await TradingSuite.create("MNQ")
# Multi-format export capabilities
prometheus_metrics = await suite.export_stats("prometheus")
csv_data = await suite.export_stats("csv")
datadog_metrics = await suite.export_stats("datadog")
json_data = await suite.export_stats("json")
# Save to files
with open("metrics.prom", "w") as f:
f.write(prometheus_metrics)
with open("stats.csv", "w") as f:
f.write(csv_data)
await suite.disconnect()
Health Monitoring
Real-time Health Scoring
async def monitor_health():
suite = await TradingSuite.create("MNQ")
# Real-time health monitoring with degradation detection
health_score = await suite.get_health_score()
if health_score < 70:
print("⚠️ System health degraded - check components")
# Get detailed component health
component_health = await suite.get_component_health()
for name, health in component_health.items():
if health['error_count'] > 0:
print(f" {name}: {health['error_count']} errors")
await suite.disconnect()
Health Thresholds
from project_x_py.statistics.health import HealthThresholds, HealthMonitor
async def custom_health_monitoring():
# Configure custom health thresholds
thresholds = HealthThresholds(
error_rate_warning=1.0, # 1% error rate warning
error_rate_critical=5.0, # 5% error rate critical
response_time_warning=500, # 500ms response time warning
memory_usage_warning=80.0 # 80% memory usage warning
)
# Custom category weights
weights = {
"errors": 0.30, # Emphasize error tracking
"performance": 0.25, # Performance is critical
"connection": 0.20, # Connection stability
"resources": 0.15, # Resource usage
"data_quality": 0.10, # Data quality
}
# Initialize custom health monitor
monitor = HealthMonitor(thresholds=thresholds, weights=weights)
# Use with aggregator
suite = await TradingSuite.create("MNQ")
stats = await suite.get_stats()
health_score = await monitor.calculate_health(stats)
print(f"Custom Health Score: {health_score:.1f}/100")
await suite.disconnect()
Data Types
Statistics Types
- class TradingSuiteStats[source]
Bases:
TypedDictComprehensive statistics for TradingSuite instance.
-
components:
dict[str,ComponentStats]
-
components:
- class ComponentStats[source]
Bases:
TypedDictStatistics for individual components within TradingSuite.
- class ComprehensiveStats[source]
Bases:
TypedDictCombined statistics from all components and connections.
-
suite:
TradingSuiteStats
-
health:
NotRequired[HealthStats]
-
performance:
NotRequired[PerformanceStats]
-
errors:
NotRequired[ErrorStats]
-
connections:
NotRequired[ConnectionStats]
-
trading:
NotRequired[TradingStats]
-
order_manager:
NotRequired[OrderManagerStats]
-
position_manager:
NotRequired[PositionManagerStats]
-
data_manager:
NotRequired[RealtimeDataManagerStats]
-
orderbook:
NotRequired[OrderbookStats]
-
risk_manager:
NotRequired[RiskManagerStats]
-
realtime:
NotRequired[RealtimeConnectionStats]
-
http_client:
NotRequired[HTTPClientStats]
-
cache:
NotRequired[CacheStats]
-
memory:
NotRequired[MemoryUsageStats]
-
suite:
- class RealtimeDataManagerStats[source]
Bases:
TypedDictStatistics for RealtimeDataManager component.
Health Types
- class HealthThresholds(error_rate_excellent=1.0, error_rate_good=5.0, error_rate_warning=20.0, error_rate_critical=50.0, response_time_excellent=100.0, response_time_good=500.0, response_time_warning=2000.0, response_time_critical=5000.0, reconnection_excellent=0, reconnection_good=2, reconnection_warning=10, reconnection_critical=30, memory_usage_excellent=50.0, memory_usage_good=70.0, memory_usage_warning=85.0, memory_usage_critical=95.0, validation_error_excellent=0.1, validation_error_good=1.0, validation_error_warning=5.0, validation_error_critical=10.0)[source]
Bases:
objectConfigurable thresholds for health scoring.
- __init__(error_rate_excellent=1.0, error_rate_good=5.0, error_rate_warning=20.0, error_rate_critical=50.0, response_time_excellent=100.0, response_time_good=500.0, response_time_warning=2000.0, response_time_critical=5000.0, reconnection_excellent=0, reconnection_good=2, reconnection_warning=10, reconnection_critical=30, memory_usage_excellent=50.0, memory_usage_good=70.0, memory_usage_warning=85.0, memory_usage_critical=95.0, validation_error_excellent=0.1, validation_error_good=1.0, validation_error_warning=5.0, validation_error_critical=10.0)
Performance Considerations
Caching Strategy
The statistics system uses TTL caching to optimize performance:
Default TTL: 5 seconds for expensive operations
Parallel Collection: Components collected concurrently using asyncio.gather()
Timeout Protection: 1 second timeout per component prevents hanging
Graceful Degradation: Partial results returned if some components fail
Memory Management
Circular Buffers: Error history limited to 100 entries per component
Bounded Statistics: Maximum limits prevent memory exhaustion
Lock-free Reads: Frequently accessed metrics use atomic operations
Automatic Cleanup: Old data cleaned up based on configurable parameters
Best Practices
Monitor Health Regularly: Check health scores to detect issues early
Use Appropriate Export Formats: Prometheus for monitoring, CSV for analysis
Configure Thresholds: Adjust health thresholds based on your environment
Handle Degradation: Implement alerts for health score drops
Regular Exports: Export statistics periodically for historical analysis
Example: Production Monitoring
import asyncio
from project_x_py import TradingSuite
async def production_monitoring():
"""Complete production monitoring example."""
suite = await TradingSuite.create(
"MNQ",
features=["orderbook", "risk_manager"]
)
# Run monitoring loop
while True:
try:
# Get comprehensive statistics
stats = await suite.get_stats()
# Check system health
health = stats.get('health_score', 0)
if health < 80:
print(f"⚠️ Health Alert: {health:.1f}/100")
# Get component breakdown
component_health = await suite.get_component_health()
for name, metrics in component_health.items():
if metrics['error_count'] > 5:
print(f" {name}: {metrics['error_count']} errors")
# Export metrics for monitoring system
prometheus_data = await suite.export_stats("prometheus")
# Save to monitoring endpoint (example)
# await send_to_monitoring_system(prometheus_data)
# Performance metrics
print(f"API Success Rate: {stats.get('api_success_rate', 0):.1%}")
print(f"Memory Usage: {stats.get('memory_usage_mb', 0):.1f} MB")
# Wait before next check
await asyncio.sleep(30) # Check every 30 seconds
except Exception as e:
print(f"Monitoring error: {e}")
await asyncio.sleep(60) # Longer wait on error
# Run monitoring
asyncio.run(production_monitoring())
Integration Examples
Prometheus Integration
async def prometheus_integration():
suite = await TradingSuite.create("MNQ")
# Export Prometheus metrics
metrics = await suite.export_stats("prometheus")
# Example Prometheus metrics format:
# # HELP projectx_api_calls_total Total API calls
# # TYPE projectx_api_calls_total counter
# projectx_api_calls_total 1234
#
# # HELP projectx_health_score Current health score
# # TYPE projectx_health_score gauge
# projectx_health_score 85.5
# Send to Prometheus pushgateway
# requests.post('http://pushgateway:9091/metrics/job/projectx',
# data=metrics)
await suite.disconnect()
Datadog Integration
async def datadog_integration():
suite = await TradingSuite.create("MNQ")
# Export Datadog-compatible metrics
metrics = await suite.export_stats("datadog")
# Example: Send to Datadog (requires datadog library)
# from datadog import api
#
# for metric in metrics:
# api.Metric.send(
# metric='projectx.health_score',
# points=metric['value'],
# tags=['environment:production']
# )
await suite.disconnect()
CSV Analytics
async def csv_analytics():
suite = await TradingSuite.create("MNQ")
# Export CSV for analytics
csv_data = await suite.export_stats("csv")
# Save for analysis
with open("trading_stats.csv", "w") as f:
f.write(csv_data)
# Example: Load with pandas for analysis
# import pandas as pd
# df = pd.read_csv("trading_stats.csv")
# print(df.describe())
await suite.disconnect()
Troubleshooting
Common Issues
- High Error Rates
Check component error counts and logs for specific issues.
- Low Health Scores
Review individual component health metrics to identify bottlenecks.
- Memory Usage Spikes
Monitor circular buffer sizes and cleanup frequencies.
- Slow Statistics Collection
Check network connectivity and component response times.
Debugging
async def debug_statistics():
suite = await TradingSuite.create("MNQ")
# Enable debug logging
import logging
logging.getLogger("project_x_py.statistics").setLevel(logging.DEBUG)
# Get raw component statistics
for component_name in ["orders", "positions", "data"]:
if hasattr(suite, component_name):
component = getattr(suite, component_name)
if hasattr(component, "get_stats"):
stats = await component.get_stats()
print(f"{component_name}: {stats}")
await suite.disconnect()
Examples
The repository includes comprehensive examples demonstrating the statistics system:
20_statistics_usage.py - Complete statistics system demonstration
24_bounded_statistics_demo.py - Memory-bounded statistics with limits
19_risk_manager_live_demo.py - Risk manager statistics in action
22_circuit_breaker_protection.py - Health monitoring with circuit breakers
See Also
Trading API - Order and position management
Data API - Real-time data management
OrderBook API - Level 2 orderbook analysis
Utilities - Utility functions and helpers