Statistics & Analytics API

Comprehensive async-first statistics system with health monitoring and performance tracking (v3.3.0+).

Overview

The statistics system provides centralized collection and analysis of performance metrics across all SDK components. Features include:

  • 100% Async Architecture: All statistics methods use async/await for optimal performance

  • Multi-format Export: JSON, Prometheus, CSV, and Datadog formats with data sanitization

  • Component-Specific Tracking: Enhanced statistics for all managers with specialized metrics

  • Health Monitoring: Intelligent 0-100 health scoring with configurable thresholds

  • Performance Optimization: TTL caching, parallel collection, and circular buffers

  • Memory Efficiency: Circular buffers and lock-free reads for frequently accessed metrics

Core Components

Statistics Aggregator

class StatisticsAggregator(cache_ttl=5.0, component_timeout=1.0)[source]

Bases: BaseStatisticsTracker

Centralized statistics aggregation for all ProjectX SDK components.

Provides parallel collection from registered components, cross-component metrics calculation, health score aggregation, and performance optimization through TTL caching. Handles component failures gracefully with timeout protection and partial result recovery.

Features:
  • Async component registration and management

  • Parallel statistics collection using asyncio.gather()

  • Cross-component metrics calculation (total errors, combined P&L)

  • Health score aggregation with weighted averages

  • TTL caching for expensive operations (5-second default)

  • Timeout protection (1 second per component)

  • Graceful error handling with partial results

  • Type-safe statistics using ComprehensiveStats

Performance Optimizations:
  • Parallel collection reduces total time to max component time

  • TTL caching prevents redundant calculations within cache window

  • Timeout protection prevents hanging on failed components

  • Memory-efficient handling of large statistics datasets

__init__(cache_ttl=5.0, component_timeout=1.0)[source]

Initialize the statistics aggregator.

Parameters:
  • cache_ttl (float) – Cache TTL in seconds for expensive operations (default: 5.0)

  • component_timeout (float) – Timeout in seconds for individual component collection (default: 1.0)

async register_component(name, component)[source]

Register a component for statistics collection.

Components should implement at least one of: get_stats(), get_statistics(), get_memory_stats(), or get_health_score() methods. The aggregator will automatically detect which methods are available and use them appropriately.

Parameters:
  • name (str) – Unique name for the component

  • component (Any) – Component instance to register

Raises:

ValueError – If component name is already registered

Return type:

None

async unregister_component(name)[source]

Remove a component from statistics collection.

Parameters:

name (str) – Name of the component to remove

Raises:

KeyError – If component name is not registered

Return type:

None

async get_comprehensive_stats()[source]

Get comprehensive statistics from all registered components.

Collects statistics from all components in parallel using asyncio.gather(), calculates cross-component metrics, and aggregates health scores. Uses TTL caching to optimize performance for repeated calls within the cache window.

Return type:

ComprehensiveStats

Returns:

ComprehensiveStats with suite, component, connection, and performance data

Performance:
  • Parallel collection reduces total time to max component time

  • TTL caching prevents redundant expensive operations

  • Timeout protection ensures responsiveness even with failed components

async get_suite_stats()[source]

Get TradingSuite-level statistics with cross-component metrics.

Provides suite-level view of the system including component status, cross-component metrics, and overall health scoring. Optimized for frequent polling with TTL caching and efficient component collection.

Return type:

TradingSuiteStats

Returns:

TradingSuiteStats with suite-level metrics and component summary

Performance:
  • Lighter weight than comprehensive stats collection

  • Focuses on suite-level metrics and cross-component calculations

  • TTL caching for frequent polling scenarios

async get_registered_components()[source]

Get list of registered component names.

Return type:

list[str]

Returns:

List of component names currently registered

async get_component_count()[source]

Get number of registered components.

Return type:

int

Returns:

Number of components currently registered

async clear_all_components()[source]

Remove all registered components.

Useful for cleanup or testing scenarios.

Return type:

None

async aggregate_stats(force_refresh=False)[source]

Compatibility method for TradingSuite integration.

This method provides backward compatibility with the old StatisticsAggregator interface used by TradingSuite. New code should use get_suite_stats().

Parameters:

force_refresh (bool) – Force refresh bypassing cache

Returns:

Aggregated statistics from all components

Return type:

TradingSuiteStats

__setattr__(name, value)[source]

Compatibility layer for direct component assignment.

Supports the old pattern where TradingSuite sets components directly: aggregator.order_manager = order_manager aggregator.data_manager = data_manager etc.

Return type:

None

Health Monitor

class HealthMonitor(thresholds=None, weights=None)[source]

Bases: object

Comprehensive health monitoring with intelligent scoring algorithms.

Evaluates system health across multiple dimensions including error rates, performance metrics, connection stability, resource usage, and data quality. Provides actionable insights with configurable thresholds and alert levels.

Features:
  • Multi-factor health scoring with weighted categories

  • Configurable thresholds for different environments

  • Smooth scoring transitions to prevent alert flapping

  • Graceful handling of missing statistics

  • Performance-optimized async calculations

  • Actionable alerts with specific recommendations

__init__(thresholds=None, weights=None)[source]

Initialize the health monitor with configurable thresholds and weights.

Parameters:
async calculate_health(stats)[source]

Calculate overall health score based on comprehensive statistics.

Parameters:

stats (ComprehensiveStats) – Comprehensive statistics from all components

Return type:

float

Returns:

Health score between 0-100 (100 = perfect health)

async get_health_breakdown(stats)[source]

Get detailed breakdown of health scores by category.

Parameters:

stats (ComprehensiveStats) – Comprehensive statistics from all components

Return type:

HealthBreakdown

Returns:

Detailed health breakdown with scores for each category

async get_health_alerts(stats)[source]

Generate health alerts based on current statistics.

Parameters:

stats (ComprehensiveStats) – Comprehensive statistics from all components

Return type:

list[HealthAlert]

Returns:

List of health alerts with severity levels and recommendations

Base Statistics Tracker

class BaseStatisticsTracker(component_name, max_errors=100, cache_ttl=5.0)[source]

Bases: object

Base class for async statistics tracking with thread safety and caching.

Provides foundational statistics tracking capabilities including counters, gauges, timing data, error tracking, and health scoring. All operations are async-safe using asyncio.Lock and include TTL caching for expensive operations.

Features:
  • Async-safe counters and gauges using asyncio.Lock

  • Efficient memory tracking with caching

  • Error history with circular buffer (maxlen=100)

  • Performance timing tracking

  • Health scoring algorithm (0-100 scale)

  • TTL cache for expensive operations (5-second default)

  • Single lock per component to prevent deadlocks

__init__(component_name, max_errors=100, cache_ttl=5.0)[source]

Initialize the statistics tracker.

Parameters:
  • component_name (str) – Name of the component being tracked

  • max_errors (int) – Maximum number of errors to keep in history

  • cache_ttl (float) – Cache TTL in seconds for expensive operations

async increment(metric, value=1)[source]

Increment a counter metric by the specified value.

Parameters:
  • metric (str) – Name of the metric to increment

  • value (int | float) – Value to increment by (default: 1)

Return type:

None

async set_gauge(metric, value)[source]

Set a gauge metric to the specified value.

Parameters:
  • metric (str) – Name of the gauge metric

  • value (int | float | Decimal) – Value to set the gauge to

Return type:

None

async record_timing(operation, duration_ms)[source]

Record timing information for an operation.

Parameters:
  • operation (str) – Name of the operation being timed

  • duration_ms (float) – Duration in milliseconds

Return type:

None

async track_error(error, context, details=None)[source]

Track an error occurrence with context and details.

Parameters:
  • error (Exception | str) – The error that occurred

  • context (str) – Context in which the error occurred

  • details (dict[str, Any] | None) – Additional error details

Return type:

None

async get_error_count()[source]

Get total number of errors tracked.

Return type:

int

async get_recent_errors(limit=10)[source]

Get recent errors.

Parameters:

limit (int) – Maximum number of errors to return

Return type:

list[dict[str, Any]]

Returns:

List of recent error dictionaries

async set_status(status)[source]

Set the component status.

Parameters:

status (str) – New status (“connected”, “disconnected”, “error”, “initializing”)

Return type:

None

async get_status()[source]

Get current component status.

Return type:

str

async get_uptime()[source]

Get component uptime in seconds.

Return type:

int

async get_memory_usage()[source]

Get estimated memory usage in MB.

Override in subclasses for component-specific memory calculations.

Return type:

float

async get_health_score()[source]

Calculate health score for this component (0-100 scale).

Health scoring factors: - Error rate (40% weight): Lower error rate = higher score - Uptime (20% weight): Longer uptime = higher score - Activity (20% weight): Recent activity = higher score - Status (20% weight): Connected status = higher score

Return type:

float

Returns:

Health score between 0-100 (100 = perfect health)

async get_stats()[source]

Get current statistics for this component.

Return type:

ComponentStats

Returns:

ComponentStats with current metrics and status

async cleanup_cache()[source]

Clean up expired cache entries.

Return type:

None

async reset_metrics()[source]

Reset all metrics and statistics.

Return type:

None

Statistics Collector

TradingSuite Statistics

Getting Statistics

from project_x_py import TradingSuite

async def get_comprehensive_statistics():
    suite = await TradingSuite.create("MNQ")

    # Get comprehensive system statistics (async-first API)
    stats = await suite.get_stats()

    # Health scoring (0-100) with intelligent monitoring
    print(f"System Health: {stats['health_score']:.1f}/100")

    # Performance metrics with enhanced tracking
    print(f"API Calls: {stats['total_api_calls']}")
    print(f"Success Rate: {stats['api_success_rate']:.1%}")
    print(f"Memory Usage: {stats['memory_usage_mb']:.1f} MB")

    await suite.disconnect()

Component-Specific Statistics

async def component_statistics():
    suite = await TradingSuite.create("MNQ")

    # Component-specific statistics (all async for consistency)
    order_stats = await suite.orders.get_stats()
    print(f"Fill Rate: {order_stats['fill_rate']:.1%}")
    print(f"Average Fill Time: {order_stats['avg_fill_time_ms']:.0f}ms")

    position_stats = await suite.positions.get_stats()
    print(f"Win Rate: {position_stats.get('win_rate', 0):.1%}")

    # OrderBook statistics (if enabled)
    if suite.orderbook:
        orderbook_stats = await suite.orderbook.get_stats()
        print(f"Depth Updates: {orderbook_stats['depth_updates']}")

    await suite.disconnect()

Export Capabilities

Multi-format Export

async def export_statistics():
    suite = await TradingSuite.create("MNQ")

    # Multi-format export capabilities
    prometheus_metrics = await suite.export_stats("prometheus")
    csv_data = await suite.export_stats("csv")
    datadog_metrics = await suite.export_stats("datadog")
    json_data = await suite.export_stats("json")

    # Save to files
    with open("metrics.prom", "w") as f:
        f.write(prometheus_metrics)

    with open("stats.csv", "w") as f:
        f.write(csv_data)

    await suite.disconnect()

Health Monitoring

Real-time Health Scoring

async def monitor_health():
    suite = await TradingSuite.create("MNQ")

    # Real-time health monitoring with degradation detection
    health_score = await suite.get_health_score()
    if health_score < 70:
        print("⚠️ System health degraded - check components")

        # Get detailed component health
        component_health = await suite.get_component_health()
        for name, health in component_health.items():
            if health['error_count'] > 0:
                print(f"  {name}: {health['error_count']} errors")

    await suite.disconnect()

Health Thresholds

from project_x_py.statistics.health import HealthThresholds, HealthMonitor

async def custom_health_monitoring():
    # Configure custom health thresholds
    thresholds = HealthThresholds(
        error_rate_warning=1.0,    # 1% error rate warning
        error_rate_critical=5.0,   # 5% error rate critical
        response_time_warning=500, # 500ms response time warning
        memory_usage_warning=80.0  # 80% memory usage warning
    )

    # Custom category weights
    weights = {
        "errors": 0.30,         # Emphasize error tracking
        "performance": 0.25,    # Performance is critical
        "connection": 0.20,     # Connection stability
        "resources": 0.15,      # Resource usage
        "data_quality": 0.10,   # Data quality
    }

    # Initialize custom health monitor
    monitor = HealthMonitor(thresholds=thresholds, weights=weights)

    # Use with aggregator
    suite = await TradingSuite.create("MNQ")
    stats = await suite.get_stats()
    health_score = await monitor.calculate_health(stats)

    print(f"Custom Health Score: {health_score:.1f}/100")

    await suite.disconnect()

Data Types

Statistics Types

class TradingSuiteStats[source]

Bases: TypedDict

Comprehensive statistics for TradingSuite instance.

suite_id: str
instrument: str
created_at: str
uptime_seconds: int
status: str
connected: bool
components: dict[str, ComponentStats]
realtime_connected: bool
user_hub_connected: bool
market_hub_connected: bool
total_api_calls: int
successful_api_calls: int
failed_api_calls: int
avg_response_time_ms: float
cache_hit_rate: float
memory_usage_mb: float
active_subscriptions: int
message_queue_size: int
features_enabled: list[str]
timeframes: list[str]
total_errors: NotRequired[int]
health_score: NotRequired[float]
class ComponentStats[source]

Bases: TypedDict

Statistics for individual components within TradingSuite.

name: str
status: str
uptime_seconds: int
last_activity: str | None
error_count: int
memory_usage_mb: float
performance_metrics: NotRequired[dict[str, Any]]
class ComprehensiveStats[source]

Bases: TypedDict

Combined statistics from all components and connections.

suite: TradingSuiteStats
health: NotRequired[HealthStats]
performance: NotRequired[PerformanceStats]
errors: NotRequired[ErrorStats]
connections: NotRequired[ConnectionStats]
trading: NotRequired[TradingStats]
order_manager: NotRequired[OrderManagerStats]
position_manager: NotRequired[PositionManagerStats]
data_manager: NotRequired[RealtimeDataManagerStats]
orderbook: NotRequired[OrderbookStats]
risk_manager: NotRequired[RiskManagerStats]
realtime: NotRequired[RealtimeConnectionStats]
http_client: NotRequired[HTTPClientStats]
cache: NotRequired[CacheStats]
memory: NotRequired[MemoryUsageStats]
generated_at: str
collection_time_ms: float
class OrderManagerStats[source]

Bases: TypedDict

Statistics for OrderManager component.

orders_placed: int
orders_filled: int
orders_cancelled: int
orders_rejected: int
orders_modified: int
fill_rate: float
avg_fill_time_ms: float
rejection_rate: float
market_orders: int
limit_orders: int
stop_orders: int
bracket_orders: int
last_order_time: str | None
avg_order_response_time_ms: float
fastest_fill_ms: float
slowest_fill_ms: float
total_volume: int
total_value: float
avg_order_size: float
largest_order: int
risk_violations: int
order_validation_failures: int
class PositionManagerStats[source]

Bases: TypedDict

Statistics for PositionManager component.

open_positions: int
closed_positions: int
total_positions: int
total_pnl: float
realized_pnl: float
unrealized_pnl: float
best_position_pnl: float
worst_position_pnl: float
avg_position_size: float
largest_position: int
avg_hold_time_minutes: float
longest_hold_time_minutes: float
win_rate: float
profit_factor: float
sharpe_ratio: float
max_drawdown: float
total_risk: float
max_position_risk: float
portfolio_correlation: float
var_95: float
position_updates: int
risk_calculations: int
last_position_update: str | None
class RealtimeDataManagerStats[source]

Bases: TypedDict

Statistics for RealtimeDataManager component.

bars_processed: int
ticks_processed: int
quotes_processed: int
trades_processed: int
timeframe_stats: dict[str, dict[str, int]]
avg_processing_time_ms: float
data_latency_ms: float
buffer_utilization: float
total_bars_stored: int
memory_usage_mb: float
compression_ratio: float
updates_per_minute: float
last_update: str | None
data_freshness_seconds: float
data_validation_errors: int
connection_interruptions: int
recovery_attempts: int
overflow_stats: dict[str, Any]
buffer_overflow_stats: dict[str, Any]
lock_optimization_stats: dict[str, Any]
class RiskManagerStats[source]

Bases: TypedDict

Statistics for RiskManager component.

rules_evaluated: int
rule_violations: int
rule_warnings: int
rules_passed: int
total_risk_exposure: float
max_position_risk: float
portfolio_risk: float
var_95: float
max_loss_limit: float
daily_loss_limit: float
position_size_limit: int
leverage_limit: float
stop_losses_triggered: int
margin_calls: int
risk_alerts: int
emergency_stops: int
risk_calculations_per_second: float
avg_calculation_time_ms: float
memory_usage_mb: float
managed_trades_active: int
managed_trades_completed: int
managed_trades_stopped: int
avg_trade_duration_minutes: float
sharpe_ratio: float
sortino_ratio: float
max_drawdown: float
risk_adjusted_return: float

Health Types

class HealthThresholds(error_rate_excellent=1.0, error_rate_good=5.0, error_rate_warning=20.0, error_rate_critical=50.0, response_time_excellent=100.0, response_time_good=500.0, response_time_warning=2000.0, response_time_critical=5000.0, reconnection_excellent=0, reconnection_good=2, reconnection_warning=10, reconnection_critical=30, memory_usage_excellent=50.0, memory_usage_good=70.0, memory_usage_warning=85.0, memory_usage_critical=95.0, validation_error_excellent=0.1, validation_error_good=1.0, validation_error_warning=5.0, validation_error_critical=10.0)[source]

Bases: object

Configurable thresholds for health scoring.

error_rate_excellent: float = 1.0
error_rate_good: float = 5.0
error_rate_warning: float = 20.0
error_rate_critical: float = 50.0
response_time_excellent: float = 100.0
response_time_good: float = 500.0
response_time_warning: float = 2000.0
response_time_critical: float = 5000.0
reconnection_excellent: int = 0
reconnection_good: int = 2
reconnection_warning: int = 10
reconnection_critical: int = 30
memory_usage_excellent: float = 50.0
memory_usage_good: float = 70.0
memory_usage_warning: float = 85.0
memory_usage_critical: float = 95.0
validation_error_excellent: float = 0.1
validation_error_good: float = 1.0
validation_error_warning: float = 5.0
validation_error_critical: float = 10.0
__init__(error_rate_excellent=1.0, error_rate_good=5.0, error_rate_warning=20.0, error_rate_critical=50.0, response_time_excellent=100.0, response_time_good=500.0, response_time_warning=2000.0, response_time_critical=5000.0, reconnection_excellent=0, reconnection_good=2, reconnection_warning=10, reconnection_critical=30, memory_usage_excellent=50.0, memory_usage_good=70.0, memory_usage_warning=85.0, memory_usage_critical=95.0, validation_error_excellent=0.1, validation_error_good=1.0, validation_error_warning=5.0, validation_error_critical=10.0)

Performance Considerations

Caching Strategy

The statistics system uses TTL caching to optimize performance:

  • Default TTL: 5 seconds for expensive operations

  • Parallel Collection: Components collected concurrently using asyncio.gather()

  • Timeout Protection: 1 second timeout per component prevents hanging

  • Graceful Degradation: Partial results returned if some components fail

Memory Management

  • Circular Buffers: Error history limited to 100 entries per component

  • Bounded Statistics: Maximum limits prevent memory exhaustion

  • Lock-free Reads: Frequently accessed metrics use atomic operations

  • Automatic Cleanup: Old data cleaned up based on configurable parameters

Best Practices

  1. Monitor Health Regularly: Check health scores to detect issues early

  2. Use Appropriate Export Formats: Prometheus for monitoring, CSV for analysis

  3. Configure Thresholds: Adjust health thresholds based on your environment

  4. Handle Degradation: Implement alerts for health score drops

  5. Regular Exports: Export statistics periodically for historical analysis

Example: Production Monitoring

import asyncio
from project_x_py import TradingSuite

async def production_monitoring():
    """Complete production monitoring example."""
    suite = await TradingSuite.create(
        "MNQ",
        features=["orderbook", "risk_manager"]
    )

    # Run monitoring loop
    while True:
        try:
            # Get comprehensive statistics
            stats = await suite.get_stats()

            # Check system health
            health = stats.get('health_score', 0)
            if health < 80:
                print(f"⚠️ Health Alert: {health:.1f}/100")

                # Get component breakdown
                component_health = await suite.get_component_health()
                for name, metrics in component_health.items():
                    if metrics['error_count'] > 5:
                        print(f"  {name}: {metrics['error_count']} errors")

            # Export metrics for monitoring system
            prometheus_data = await suite.export_stats("prometheus")

            # Save to monitoring endpoint (example)
            # await send_to_monitoring_system(prometheus_data)

            # Performance metrics
            print(f"API Success Rate: {stats.get('api_success_rate', 0):.1%}")
            print(f"Memory Usage: {stats.get('memory_usage_mb', 0):.1f} MB")

            # Wait before next check
            await asyncio.sleep(30)  # Check every 30 seconds

        except Exception as e:
            print(f"Monitoring error: {e}")
            await asyncio.sleep(60)  # Longer wait on error

# Run monitoring
asyncio.run(production_monitoring())

Integration Examples

Prometheus Integration

async def prometheus_integration():
    suite = await TradingSuite.create("MNQ")

    # Export Prometheus metrics
    metrics = await suite.export_stats("prometheus")

    # Example Prometheus metrics format:
    # # HELP projectx_api_calls_total Total API calls
    # # TYPE projectx_api_calls_total counter
    # projectx_api_calls_total 1234
    #
    # # HELP projectx_health_score Current health score
    # # TYPE projectx_health_score gauge
    # projectx_health_score 85.5

    # Send to Prometheus pushgateway
    # requests.post('http://pushgateway:9091/metrics/job/projectx',
    #               data=metrics)

    await suite.disconnect()

Datadog Integration

async def datadog_integration():
    suite = await TradingSuite.create("MNQ")

    # Export Datadog-compatible metrics
    metrics = await suite.export_stats("datadog")

    # Example: Send to Datadog (requires datadog library)
    # from datadog import api
    #
    # for metric in metrics:
    #     api.Metric.send(
    #         metric='projectx.health_score',
    #         points=metric['value'],
    #         tags=['environment:production']
    #     )

    await suite.disconnect()

CSV Analytics

async def csv_analytics():
    suite = await TradingSuite.create("MNQ")

    # Export CSV for analytics
    csv_data = await suite.export_stats("csv")

    # Save for analysis
    with open("trading_stats.csv", "w") as f:
        f.write(csv_data)

    # Example: Load with pandas for analysis
    # import pandas as pd
    # df = pd.read_csv("trading_stats.csv")
    # print(df.describe())

    await suite.disconnect()

Troubleshooting

Common Issues

High Error Rates

Check component error counts and logs for specific issues.

Low Health Scores

Review individual component health metrics to identify bottlenecks.

Memory Usage Spikes

Monitor circular buffer sizes and cleanup frequencies.

Slow Statistics Collection

Check network connectivity and component response times.

Debugging

async def debug_statistics():
    suite = await TradingSuite.create("MNQ")

    # Enable debug logging
    import logging
    logging.getLogger("project_x_py.statistics").setLevel(logging.DEBUG)

    # Get raw component statistics
    for component_name in ["orders", "positions", "data"]:
        if hasattr(suite, component_name):
            component = getattr(suite, component_name)
            if hasattr(component, "get_stats"):
                stats = await component.get_stats()
                print(f"{component_name}: {stats}")

    await suite.disconnect()

Examples

The repository includes comprehensive examples demonstrating the statistics system:

  • 20_statistics_usage.py - Complete statistics system demonstration

  • 24_bounded_statistics_demo.py - Memory-bounded statistics with limits

  • 19_risk_manager_live_demo.py - Risk manager statistics in action

  • 22_circuit_breaker_protection.py - Health monitoring with circuit breakers

See Also