Source code for project_x_py.statistics.aggregator

"""
Centralized statistics aggregation for ProjectX SDK.

Author: @TexasCoding
Date: 2025-08-21

Overview:
    StatisticsAggregator provides centralized collection and aggregation of statistics
    from all registered SDK components. Features parallel collection using asyncio.gather(),
    cross-component metrics calculation, health score aggregation, and performance
    optimization through TTL caching. Handles component failures gracefully with
    timeout protection and partial result recovery.

Key Features:
    - 100% async architecture with parallel component collection
    - Centralized registration system for statistics providers
    - Cross-component metrics calculation (total errors, combined P&L, etc.)
    - Health score aggregation with weighted averages
    - TTL caching for performance optimization (5-second default)
    - Graceful error handling with timeout protection (1 second per component)
    - Partial result recovery when some components fail
    - Type-safe statistics using ComprehensiveStats and TradingSuiteStats

Components Supported:
    - TradingSuite: Suite-level statistics and component orchestration
    - OrderManager: Order lifecycle and execution metrics
    - PositionManager: P&L analysis and position tracking
    - RealtimeDataManager: Data throughput and latency monitoring
    - OrderBook: Market microstructure and depth analysis
    - RiskManager: Risk assessment and managed trade monitoring

Cross-Component Metrics:
    - Total errors across all components
    - Overall health score (weighted average)
    - System-wide performance metrics (API calls, response times)
    - Combined P&L from position and risk managers
    - Total memory usage and resource utilization
    - Aggregated data throughput and processing rates

Example Usage:
    ```python
    from project_x_py.statistics.aggregator import StatisticsAggregator

    # Initialize aggregator
    aggregator = StatisticsAggregator()

    # Register components
    await aggregator.register_component("trading_suite", trading_suite)
    await aggregator.register_component("order_manager", order_manager)

    # Get comprehensive statistics
    stats = await aggregator.get_comprehensive_stats()
    print(f"Overall Health: {stats['suite']['health_score']}")

    # Get suite-level statistics only
    suite_stats = await aggregator.get_suite_stats()
    print(f"Total Errors: {suite_stats['total_errors']}")
    ```

Performance Considerations:
    - Parallel collection reduces total time from sum of components to max component time
    - TTL caching prevents redundant expensive operations within 5-second windows
    - Timeout protection (1 second per component) prevents hanging on failed components
    - Memory-efficient partial result handling for large-scale deployments
    - Graceful degradation ensures aggregator remains functional even with component failures

See Also:
    - `project_x_py.statistics.base`: Base statistics tracking infrastructure
    - `project_x_py.statistics.collector`: Component-specific collection
    - `project_x_py.types.stats_types`: TypedDict definitions for type safety
"""

import asyncio
import time
from typing import TYPE_CHECKING, Any, Protocol

from project_x_py.statistics.base import BaseStatisticsTracker
from project_x_py.statistics.collector import ComponentCollector
from project_x_py.types.stats_types import (
    ComponentStats,
    ComprehensiveStats,
    TradingSuiteStats,
)

if TYPE_CHECKING:
    pass


class ComponentProtocol(Protocol):
    """
    Protocol for components that can provide statistics.

    Note: While the v3.3.0 statistics system is 100% async internally,
    this protocol supports both sync and async methods for backward
    compatibility during migration. New components should implement
    only the async methods.
    """

    async def get_statistics(self) -> dict[str, Any] | None:
        """Get component statistics (async - PREFERRED)."""
        ...

    async def get_health_score(self) -> float:
        """Get component health score (0-100) - async only."""
        ...


[docs] class StatisticsAggregator(BaseStatisticsTracker): """ Centralized statistics aggregation for all ProjectX SDK components. Provides parallel collection from registered components, cross-component metrics calculation, health score aggregation, and performance optimization through TTL caching. Handles component failures gracefully with timeout protection and partial result recovery. Features: - Async component registration and management - Parallel statistics collection using asyncio.gather() - Cross-component metrics calculation (total errors, combined P&L) - Health score aggregation with weighted averages - TTL caching for expensive operations (5-second default) - Timeout protection (1 second per component) - Graceful error handling with partial results - Type-safe statistics using ComprehensiveStats Performance Optimizations: - Parallel collection reduces total time to max component time - TTL caching prevents redundant calculations within cache window - Timeout protection prevents hanging on failed components - Memory-efficient handling of large statistics datasets """
[docs] def __init__(self, cache_ttl: float = 5.0, component_timeout: float = 1.0): """ Initialize the statistics aggregator. Args: cache_ttl: Cache TTL in seconds for expensive operations (default: 5.0) component_timeout: Timeout in seconds for individual component collection (default: 1.0) """ super().__init__("statistics_aggregator", cache_ttl=cache_ttl) self.component_timeout = component_timeout # Registered components for statistics collection self._components: dict[str, Any] = {} self._component_lock = asyncio.Lock() # Specialized collectors self._collector: ComponentCollector | None = None # Cross-component metrics tracking self._last_comprehensive_collection: float | None = None self._last_suite_collection: float | None = None
[docs] async def register_component(self, name: str, component: Any) -> None: """ Register a component for statistics collection. Components should implement at least one of: get_stats(), get_statistics(), get_memory_stats(), or get_health_score() methods. The aggregator will automatically detect which methods are available and use them appropriately. Args: name: Unique name for the component component: Component instance to register Raises: ValueError: If component name is already registered """ async with self._component_lock: if name in self._components: await self.track_error( ValueError(f"Component '{name}' already registered"), "Component registration", {"component_name": name}, ) raise ValueError(f"Component '{name}' already registered") self._components[name] = component await self.increment("components_registered") await self.set_status("active") # Set up specialized collector for TradingSuite if name == "trading_suite" and hasattr(component, "orders"): self._collector = ComponentCollector(component)
[docs] async def unregister_component(self, name: str) -> None: """ Remove a component from statistics collection. Args: name: Name of the component to remove Raises: KeyError: If component name is not registered """ async with self._component_lock: if name not in self._components: await self.track_error( KeyError(f"Component '{name}' not registered"), "Component unregistration", {"component_name": name}, ) raise KeyError(f"Component '{name}' not registered") del self._components[name] await self.increment("components_unregistered") # Clear collector if trading suite is removed if name == "trading_suite": self._collector = None # Update status if not self._components: await self.set_status("idle")
[docs] async def get_comprehensive_stats(self) -> ComprehensiveStats: """ Get comprehensive statistics from all registered components. Collects statistics from all components in parallel using asyncio.gather(), calculates cross-component metrics, and aggregates health scores. Uses TTL caching to optimize performance for repeated calls within the cache window. Returns: ComprehensiveStats with suite, component, connection, and performance data Performance: - Parallel collection reduces total time to max component time - TTL caching prevents redundant expensive operations - Timeout protection ensures responsiveness even with failed components """ await self.set_status("collecting") collection_start = time.time() try: # Check cache first cached_stats = await self._get_cached_value("comprehensive_stats") if cached_stats is not None: await self.increment("cache_hits") return cached_stats # type: ignore[no-any-return] # Collect from all components in parallel component_stats = await self._collect_all_components() # Get suite-level statistics suite_stats = await self._build_suite_stats(component_stats) # Build comprehensive statistics stats: ComprehensiveStats = { "suite": suite_stats, "generated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), "collection_time_ms": round((time.time() - collection_start) * 1000, 2), } # Add component-specific statistics if available if "order_manager" in component_stats: stats["order_manager"] = component_stats["order_manager"] if "position_manager" in component_stats: stats["position_manager"] = component_stats["position_manager"] if "data_manager" in component_stats: stats["data_manager"] = component_stats["data_manager"] if "orderbook" in component_stats: stats["orderbook"] = component_stats["orderbook"] if "risk_manager" in component_stats: stats["risk_manager"] = component_stats["risk_manager"] # Add connection and performance statistics if available if "realtime" in component_stats: stats["realtime"] = component_stats["realtime"] if "http_client" in component_stats: stats["http_client"] = component_stats["http_client"] if "cache" in component_stats: stats["cache"] = component_stats["cache"] if "memory" in component_stats: stats["memory"] = component_stats["memory"] # Cache the result await self._set_cached_value("comprehensive_stats", stats) await self.increment("comprehensive_collections") await self.record_timing( "comprehensive_collection", (time.time() - collection_start) * 1000 ) await self.set_status("active") self._last_comprehensive_collection = time.time() return stats except Exception as e: await self.track_error(e, "Comprehensive statistics collection failed") await self.set_status("error") # Return minimal stats on error return self._get_error_stats(collection_start)
[docs] async def get_suite_stats(self) -> TradingSuiteStats: """ Get TradingSuite-level statistics with cross-component metrics. Provides suite-level view of the system including component status, cross-component metrics, and overall health scoring. Optimized for frequent polling with TTL caching and efficient component collection. Returns: TradingSuiteStats with suite-level metrics and component summary Performance: - Lighter weight than comprehensive stats collection - Focuses on suite-level metrics and cross-component calculations - TTL caching for frequent polling scenarios """ # Register pending components if needed (compatibility layer) if hasattr(self, "_pending_components") and self._pending_components: await self._register_all_pending_components() await self.set_status("collecting") collection_start = time.time() try: # Check cache first cached_stats = await self._get_cached_value("suite_stats") if cached_stats is not None: await self.increment("cache_hits") return cached_stats # type: ignore[no-any-return] # Collect component data component_stats = await self._collect_all_components() # Build suite statistics suite_stats = await self._build_suite_stats(component_stats) # Cache the result await self._set_cached_value("suite_stats", suite_stats) await self.increment("suite_collections") await self.record_timing( "suite_collection", (time.time() - collection_start) * 1000 ) await self.set_status("active") self._last_suite_collection = time.time() return suite_stats except Exception as e: await self.track_error(e, "Suite statistics collection failed") await self.set_status("error") # Return minimal stats on error return await self._get_minimal_suite_stats()
async def _collect_all_components(self) -> dict[str, Any]: """ Collect statistics from all registered components in parallel. Uses asyncio.gather() to collect statistics from all components simultaneously, with timeout protection and graceful error handling. Failed components don't prevent collection from other components. Returns: Dictionary of component statistics keyed by component name """ if not self._components: return {} # If we have a collector, use it for detailed component stats if self._collector is not None: try: return await asyncio.wait_for( self._collector.collect(), timeout=self.component_timeout * len(self._components), ) except TimeoutError: await self.track_error( TimeoutError("Component collector timed out"), "Parallel component collection", ) except Exception as e: await self.track_error(e, "Component collector failed") # Fallback to direct component collection async with self._component_lock: components = list(self._components.items()) # Create collection tasks with timeout protection tasks = [] for name, component in components: task = asyncio.create_task(self._collect_component_stats(name, component)) tasks.append(task) # Collect with timeout protection try: results = await asyncio.wait_for( asyncio.gather(*tasks, return_exceptions=True), timeout=self.component_timeout * len(components), ) # Process results and handle exceptions component_stats = {} for (name, _), result in zip(components, results, strict=False): if isinstance(result, Exception): await self.track_error( result, f"Failed to collect statistics from {name}", {"component_name": name}, ) elif result is not None: component_stats[name] = result return component_stats except TimeoutError: await self.track_error( TimeoutError("Component collection timed out"), "Parallel component collection", ) return {} async def _collect_component_stats( self, name: str, component: Any ) -> dict[str, Any] | None: """ Collect statistics from a single component with timeout protection. Tries multiple methods to get statistics from the component: 1. get_statistics() (async) 2. get_stats() (sync) 3. get_memory_stats() (sync) 4. Direct stats attribute access Args: name: Component name for error reporting component: Component instance to collect from Returns: Component statistics dictionary or None if collection fails """ try: start_time = time.time() # Try async get_statistics() first if hasattr(component, "get_statistics"): try: if asyncio.iscoroutinefunction(component.get_statistics): result = await asyncio.wait_for( component.get_statistics(), timeout=self.component_timeout ) else: result = component.get_statistics() if result: await self.record_timing( f"{name}_collection", (time.time() - start_time) * 1000 ) return dict(result) if isinstance(result, dict) else None except (AttributeError, TypeError, TimeoutError): pass # Try sync get_stats() if hasattr(component, "get_stats"): try: result = component.get_stats() if result: await self.record_timing( f"{name}_collection", (time.time() - start_time) * 1000 ) return dict(result) if isinstance(result, dict) else None except (AttributeError, TypeError): pass # Try sync get_memory_stats() if hasattr(component, "get_memory_stats"): try: result = component.get_memory_stats() if result: await self.record_timing( f"{name}_collection", (time.time() - start_time) * 1000 ) return dict(result) if isinstance(result, dict) else None except (AttributeError, TypeError): pass # Try direct stats attribute if hasattr(component, "stats"): try: result = dict(component.stats) if component.stats else None if result: await self.record_timing( f"{name}_collection", (time.time() - start_time) * 1000 ) return result except (AttributeError, TypeError): pass return None except Exception as e: await self.track_error( e, f"Component statistics collection failed for {name}", {"component_name": name}, ) return None async def _build_suite_stats( self, component_stats: dict[str, Any] ) -> TradingSuiteStats: """ Build TradingSuite statistics with cross-component metrics. Aggregates statistics from all components to create suite-level metrics including total errors, overall health score, and system-wide performance indicators. Calculates cross-component derived metrics. Args: component_stats: Dictionary of component statistics Returns: TradingSuiteStats with aggregated suite-level metrics """ # Get trading suite component for basic info trading_suite = self._components.get("trading_suite") # Basic suite information suite_id = ( getattr(trading_suite, "suite_id", "unknown") if trading_suite else "unknown" ) instrument = ( getattr(trading_suite, "instrument", "unknown") if trading_suite else "unknown" ) created_at = ( getattr(trading_suite, "created_at", time.time()) if trading_suite else time.time() ) if isinstance(created_at, int | float): created_at_str = time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(created_at)) else: created_at_str = str(created_at) uptime_seconds = int( time.time() - (created_at if isinstance(created_at, int | float) else time.time()) ) # Calculate cross-component metrics cross_metrics = await self._calculate_cross_metrics(component_stats) # Build component status summary components: dict[str, ComponentStats] = {} for name, stats in component_stats.items(): if isinstance(stats, dict): component_stat: ComponentStats = { "name": name, "status": stats.get("status", "unknown"), "uptime_seconds": int(stats.get("uptime_seconds", 0)), "last_activity": stats.get("last_activity"), "error_count": int(stats.get("error_count", 0)), "memory_usage_mb": float(stats.get("memory_usage_mb", 0.0)), } # Add optional performance_metrics if present perf_metrics = stats.get("performance_metrics") if perf_metrics: component_stat["performance_metrics"] = perf_metrics components[name] = component_stat # Determine overall status if not components: status = "disconnected" connected = False elif any(comp.get("status") == "error" for comp in components.values()): status = "error" connected = False elif all( comp.get("status") in ["connected", "active"] for comp in components.values() ): status = "active" connected = True else: status = "connecting" connected = False # Connection status realtime_connected = False user_hub_connected = False market_hub_connected = False if trading_suite and hasattr(trading_suite, "data"): data_manager = trading_suite.data if hasattr(data_manager, "is_connected"): try: if asyncio.iscoroutinefunction(data_manager.is_connected): realtime_connected = await data_manager.is_connected() else: realtime_connected = data_manager.is_connected() except Exception: pass # Features and timeframes features_enabled = [] timeframes = [] if trading_suite: if hasattr(trading_suite, "features"): features_enabled = list(trading_suite.features) if hasattr(trading_suite, "timeframes"): timeframes = list(trading_suite.timeframes) # Build the suite statistics suite_stats: TradingSuiteStats = { "suite_id": suite_id, "instrument": instrument, "created_at": created_at_str, "uptime_seconds": uptime_seconds, "status": status, "connected": connected, "components": components, "realtime_connected": realtime_connected, "user_hub_connected": user_hub_connected, "market_hub_connected": market_hub_connected, "total_api_calls": cross_metrics["total_api_calls"], "successful_api_calls": cross_metrics["successful_api_calls"], "failed_api_calls": cross_metrics["failed_api_calls"], "avg_response_time_ms": cross_metrics["avg_response_time_ms"], "cache_hit_rate": cross_metrics["cache_hit_rate"], "memory_usage_mb": cross_metrics["memory_usage_mb"], "active_subscriptions": cross_metrics["active_subscriptions"], "message_queue_size": cross_metrics["message_queue_size"], "features_enabled": features_enabled, "timeframes": timeframes, "total_errors": cross_metrics["total_errors"], "health_score": cross_metrics["health_score"], } return suite_stats async def _calculate_cross_metrics( self, component_stats: dict[str, Any] ) -> dict[str, Any]: """ Calculate cross-component metrics from all component statistics. Aggregates metrics across all components to provide system-wide performance indicators, error totals, combined P&L, and overall health scoring. Args: component_stats: Dictionary of component statistics Returns: Dictionary with calculated cross-component metrics """ # Initialize aggregated metrics total_errors = 0 total_api_calls = 0 successful_api_calls = 0 failed_api_calls = 0 response_times = [] cache_hits = 0 cache_total = 0 memory_usage_mb = 0.0 active_subscriptions = 0 message_queue_size = 0 health_scores = [] # Aggregate metrics from all components for _, stats in component_stats.items(): if not isinstance(stats, dict): continue # Error counts total_errors += stats.get("error_count", 0) # API call metrics if "total_requests" in stats: # HTTP client stats total_api_calls += stats.get("total_requests", 0) successful_api_calls += stats.get("successful_requests", 0) failed_api_calls += stats.get("failed_requests", 0) # Response time metrics avg_response = stats.get("avg_response_time_ms", 0) if avg_response > 0: response_times.append(avg_response) # Cache metrics if "cache_hits" in stats: cache_hits += stats.get("cache_hits", 0) cache_total += stats.get("cache_hits", 0) + stats.get("cache_misses", 0) # Memory usage memory_usage_mb += stats.get("memory_usage_mb", 0.0) # Connection metrics active_subscriptions += stats.get("subscriptions_active", 0) active_subscriptions += stats.get("active_subscriptions", 0) message_queue_size += stats.get("message_queue_size", 0) # Health scores for aggregation if "health_score" in stats: health_scores.append(stats["health_score"]) # Calculate derived metrics avg_response_time_ms = ( sum(response_times) / len(response_times) if response_times else 0.0 ) cache_hit_rate = (cache_hits / cache_total) if cache_total > 0 else 0.0 # Calculate overall health score (weighted average) if health_scores: health_score = sum(health_scores) / len(health_scores) else: # Default health calculation based on errors and activity base_health = 100.0 if total_errors > 0 and total_api_calls > 0: error_rate = total_errors / max(total_api_calls, 1) base_health = max(0, 100 - (error_rate * 100)) health_score = base_health return { "total_errors": total_errors, "total_api_calls": total_api_calls, "successful_api_calls": successful_api_calls, "failed_api_calls": failed_api_calls, "avg_response_time_ms": round(avg_response_time_ms, 2), "cache_hit_rate": round(cache_hit_rate, 4), "memory_usage_mb": round(memory_usage_mb, 2), "active_subscriptions": active_subscriptions, "message_queue_size": message_queue_size, "health_score": round(health_score, 1), } async def _get_minimal_suite_stats(self) -> TradingSuiteStats: """ Get minimal suite statistics for error scenarios. Returns basic suite statistics when normal collection fails, ensuring the aggregator can always return some useful information. Returns: TradingSuiteStats with minimal default values """ current_time = time.time() return { "suite_id": "error", "instrument": "unknown", "created_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(current_time)), "uptime_seconds": 0, "status": "error", "connected": False, "components": {}, "realtime_connected": False, "user_hub_connected": False, "market_hub_connected": False, "total_api_calls": 0, "successful_api_calls": 0, "failed_api_calls": 0, "avg_response_time_ms": 0.0, "cache_hit_rate": 0.0, "memory_usage_mb": 0.0, "active_subscriptions": 0, "message_queue_size": 0, "features_enabled": [], "timeframes": [], "total_errors": 1, # Count the collection failure "health_score": 0.0, } def _get_error_stats(self, collection_start: float) -> ComprehensiveStats: """ Get error statistics for comprehensive collection failures. Args: collection_start: Timestamp when collection started Returns: ComprehensiveStats with error information """ current_time = time.time() # Create minimal suite stats suite_stats: TradingSuiteStats = { "suite_id": "error", "instrument": "unknown", "created_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(current_time)), "uptime_seconds": 0, "status": "error", "connected": False, "components": {}, "realtime_connected": False, "user_hub_connected": False, "market_hub_connected": False, "total_api_calls": 0, "successful_api_calls": 0, "failed_api_calls": 0, "avg_response_time_ms": 0.0, "cache_hit_rate": 0.0, "memory_usage_mb": 0.0, "active_subscriptions": 0, "message_queue_size": 0, "features_enabled": [], "timeframes": [], "total_errors": 1, "health_score": 0.0, } return { "suite": suite_stats, "generated_at": time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime()), "collection_time_ms": round((current_time - collection_start) * 1000, 2), }
[docs] async def get_registered_components(self) -> list[str]: """ Get list of registered component names. Returns: List of component names currently registered """ async with self._component_lock: return list(self._components.keys())
[docs] async def get_component_count(self) -> int: """ Get number of registered components. Returns: Number of components currently registered """ async with self._component_lock: return len(self._components)
[docs] async def clear_all_components(self) -> None: """ Remove all registered components. Useful for cleanup or testing scenarios. """ async with self._component_lock: component_count = len(self._components) self._components.clear() self._collector = None await self.increment("components_cleared", component_count) await self.set_status("idle")
# Compatibility layer for TradingSuite v3.2.x and earlier
[docs] async def aggregate_stats(self, force_refresh: bool = False) -> TradingSuiteStats: """ Compatibility method for TradingSuite integration. This method provides backward compatibility with the old StatisticsAggregator interface used by TradingSuite. New code should use get_suite_stats(). Args: force_refresh: Force refresh bypassing cache Returns: TradingSuiteStats: Aggregated statistics from all components """ # Clear cache if force refresh requested if force_refresh: self._cache.clear() return await self.get_suite_stats()
[docs] def __setattr__(self, name: str, value: Any) -> None: """ Compatibility layer for direct component assignment. Supports the old pattern where TradingSuite sets components directly: aggregator.order_manager = order_manager aggregator.data_manager = data_manager etc. """ # Handle component assignments for backward compatibility component_mapping = { "trading_suite": "trading_suite", "order_manager": "order_manager", "position_manager": "position_manager", "data_manager": "realtime_data_manager", "orderbook": "orderbook", "risk_manager": "risk_manager", "client": "client", "realtime_client": "realtime_client", } if name in component_mapping and value is not None: # Store components for lazy registration during stats calls if not hasattr(self, "_pending_components"): self._pending_components = {} self._pending_components[component_mapping[name]] = value # Always call parent __setattr__ super().__setattr__(name, value)
async def _register_all_pending_components(self) -> None: """Register all components that were set via direct assignment.""" if not hasattr(self, "_pending_components"): return # Make a copy to avoid modification during iteration pending_copy = dict(self._pending_components) for name, component in pending_copy.items(): try: await self.register_component(name, component) # Remove successfully registered component self._pending_components.pop(name, None) except Exception as e: # Log error but don't fail - this is for backward compatibility import logging logging.getLogger(__name__).warning( f"Failed to auto-register component {name}: {e}" ) async def _register_pending_component(self, name: str, component: Any) -> None: """Helper to register components set via direct assignment.""" try: await self.register_component(name, component) except Exception as e: # Log error but don't fail - this is for backward compatibility import logging logging.getLogger(__name__).warning( f"Failed to auto-register component {name}: {e}" )
__all__ = [ "StatisticsAggregator", "ComponentProtocol", ]