Source code for project_x_py.statistics.health

"""
Health monitoring and scoring system for ProjectX SDK components.

Author: @TexasCoding
Date: 2025-08-21

Overview:
    Provides comprehensive health monitoring with intelligent scoring algorithms
    that evaluate system health based on multiple factors including error rates,
    performance metrics, connection stability, resource usage, and data quality.
    All operations are 100% async with configurable thresholds and alert levels.

Key Features:
    - Multi-factor health scoring (0-100 scale)
    - Weighted health categories with configurable thresholds
    - Actionable health alerts and recommendations
    - Smooth scoring transitions to prevent false alerts
    - Graceful handling of missing statistics
    - Trend analysis for early warning detection
    - Performance-optimized calculations

Health Categories:
    - Error Rates (25% weight): Lower error rates = higher scores
    - Performance (20% weight): Response times, latency, throughput
    - Connection Stability (20% weight): WebSocket connections, reconnections
    - Resource Usage (15% weight): Memory, CPU, API calls
    - Data Quality (15% weight): Validation errors, data gaps
    - Component Status (5% weight): Active, connected, etc.

Alert Levels:
    - HEALTHY (80-100): All systems operating normally
    - WARNING (60-79): Minor issues detected, monitoring recommended
    - DEGRADED (40-59): Significant issues, intervention suggested
    - CRITICAL (0-39): System failure risk, immediate action required

Example Usage:
    ```python
    from project_x_py.statistics.health import HealthMonitor

    monitor = HealthMonitor()

    # Calculate overall health score
    health_score = await monitor.calculate_health(comprehensive_stats)
    print(f"System Health: {health_score}%")

    # Get detailed breakdown
    breakdown = await monitor.get_health_breakdown(comprehensive_stats)
    print(f"Error Score: {breakdown['errors']}")
    print(f"Performance Score: {breakdown['performance']}")

    # Check for alerts
    alerts = await monitor.get_health_alerts(comprehensive_stats)
    for alert in alerts:
        print(f"{alert['level']}: {alert['message']}")
    ```

Configuration:
    Health scoring thresholds are configurable via constructor parameters,
    allowing customization for different deployment environments and
    performance requirements.

See Also:
    - `project_x_py.types.stats_types.ComprehensiveStats`: Input statistics type
    - `project_x_py.statistics.aggregator`: Statistics collection and aggregation
    - `project_x_py.statistics.base`: Base statistics tracking infrastructure
"""

import asyncio
import time
from dataclasses import dataclass
from enum import Enum
from typing import Any, NotRequired, TypedDict

from project_x_py.types.stats_types import ComprehensiveStats


class AlertLevel(Enum):
    """Health alert severity levels."""

    HEALTHY = "HEALTHY"
    WARNING = "WARNING"
    DEGRADED = "DEGRADED"
    CRITICAL = "CRITICAL"


class HealthAlert(TypedDict):
    """Health alert with severity and actionable information."""

    level: str  # AlertLevel enum value
    category: str  # Health category that triggered alert
    message: str  # Human-readable alert message
    metric: str  # Specific metric that caused the alert
    current_value: float | int | str  # Current value of the metric
    threshold: float | int  # Threshold that was exceeded
    recommendation: str  # Suggested action to resolve the issue


class HealthBreakdown(TypedDict):
    """Detailed breakdown of health scores by category."""

    errors: float  # Error rate health score (0-100)
    performance: float  # Performance health score (0-100)
    connection: float  # Connection stability health score (0-100)
    resources: float  # Resource usage health score (0-100)
    data_quality: float  # Data quality health score (0-100)
    component_status: float  # Component status health score (0-100)

    # Weighted scores
    weighted_total: float  # Final weighted health score

    # Additional metadata
    missing_categories: NotRequired[list[str]]  # Categories with no data
    calculation_time_ms: NotRequired[float]  # Time taken to calculate


[docs] @dataclass class HealthThresholds: """Configurable thresholds for health scoring.""" # Error rate thresholds (errors per 1000 operations) error_rate_excellent: float = 1.0 # < 0.1% error rate error_rate_good: float = 5.0 # < 0.5% error rate error_rate_warning: float = 20.0 # < 2% error rate error_rate_critical: float = 50.0 # >= 5% error rate # Performance thresholds (milliseconds) response_time_excellent: float = 100.0 # < 100ms response_time_good: float = 500.0 # < 500ms response_time_warning: float = 2000.0 # < 2s response_time_critical: float = 5000.0 # >= 5s # Connection stability thresholds reconnection_excellent: int = 0 # No reconnections reconnection_good: int = 2 # <= 2 reconnections/hour reconnection_warning: int = 10 # <= 10 reconnections/hour reconnection_critical: int = 30 # > 30 reconnections/hour # Resource usage thresholds (percentage) memory_usage_excellent: float = 50.0 # < 50% memory usage memory_usage_good: float = 70.0 # < 70% memory usage memory_usage_warning: float = 85.0 # < 85% memory usage memory_usage_critical: float = 95.0 # >= 95% memory usage # Data quality thresholds validation_error_excellent: float = 0.1 # < 0.01% validation errors validation_error_good: float = 1.0 # < 0.1% validation errors validation_error_warning: float = 5.0 # < 0.5% validation errors validation_error_critical: float = 10.0 # >= 1% validation errors
[docs] class HealthMonitor: """ Comprehensive health monitoring with intelligent scoring algorithms. Evaluates system health across multiple dimensions including error rates, performance metrics, connection stability, resource usage, and data quality. Provides actionable insights with configurable thresholds and alert levels. Features: - Multi-factor health scoring with weighted categories - Configurable thresholds for different environments - Smooth scoring transitions to prevent alert flapping - Graceful handling of missing statistics - Performance-optimized async calculations - Actionable alerts with specific recommendations """
[docs] def __init__( self, thresholds: HealthThresholds | None = None, weights: dict[str, float] | None = None, ): """ Initialize the health monitor with configurable thresholds and weights. Args: thresholds: Custom health thresholds (uses defaults if None) weights: Custom category weights (uses defaults if None) """ self.thresholds = thresholds or HealthThresholds() # Default category weights (must sum to 1.0) self.weights = weights or { "errors": 0.25, # Error rates are most critical "performance": 0.20, # Performance impacts user experience "connection": 0.20, # Connection stability is crucial "resources": 0.15, # Resource usage affects sustainability "data_quality": 0.15, # Data quality affects decisions "component_status": 0.05, # Component status is basic indicator } # Validate weights sum to 1.0 total_weight = sum(self.weights.values()) if abs(total_weight - 1.0) > 0.001: raise ValueError(f"Health weights must sum to 1.0, got {total_weight}") # Cache for expensive calculations self._cache: dict[str, tuple[Any, float]] = {} self._cache_ttl = 5.0 # 5-second cache # Async lock for thread safety self._lock = asyncio.Lock()
[docs] async def calculate_health(self, stats: ComprehensiveStats) -> float: """ Calculate overall health score based on comprehensive statistics. Args: stats: Comprehensive statistics from all components Returns: Health score between 0-100 (100 = perfect health) """ # Check cache first cache_key = "overall_health" cached_score = await self._get_cached_value(cache_key) if cached_score is not None: return float(cached_score) # Calculate scores for each category error_score = await self._score_errors(stats) performance_score = await self._score_performance(stats) connection_score = await self._score_connection(stats) resources_score = await self._score_resources(stats) data_quality_score = await self._score_data_quality(stats) component_status_score = await self._score_component_status(stats) # Calculate weighted average weighted_score = ( error_score * self.weights["errors"] + performance_score * self.weights["performance"] + connection_score * self.weights["connection"] + resources_score * self.weights["resources"] + data_quality_score * self.weights["data_quality"] + component_status_score * self.weights["component_status"] ) # Ensure score is within bounds final_score = max(0.0, min(100.0, weighted_score)) # Cache the result await self._set_cached_value(cache_key, final_score) return round(final_score, 1)
[docs] async def get_health_breakdown(self, stats: ComprehensiveStats) -> HealthBreakdown: """ Get detailed breakdown of health scores by category. Args: stats: Comprehensive statistics from all components Returns: Detailed health breakdown with scores for each category """ start_time = time.time() # Calculate scores for each category error_score = await self._score_errors(stats) performance_score = await self._score_performance(stats) connection_score = await self._score_connection(stats) resources_score = await self._score_resources(stats) data_quality_score = await self._score_data_quality(stats) component_status_score = await self._score_component_status(stats) # Calculate weighted total weighted_total = ( error_score * self.weights["errors"] + performance_score * self.weights["performance"] + connection_score * self.weights["connection"] + resources_score * self.weights["resources"] + data_quality_score * self.weights["data_quality"] + component_status_score * self.weights["component_status"] ) # Track missing categories missing_categories = [] if not self._has_error_data(stats): missing_categories.append("errors") if not self._has_performance_data(stats): missing_categories.append("performance") if not self._has_connection_data(stats): missing_categories.append("connection") if not self._has_resource_data(stats): missing_categories.append("resources") if not self._has_data_quality_data(stats): missing_categories.append("data_quality") calculation_time = (time.time() - start_time) * 1000 # Convert to ms breakdown: HealthBreakdown = { "errors": round(error_score, 1), "performance": round(performance_score, 1), "connection": round(connection_score, 1), "resources": round(resources_score, 1), "data_quality": round(data_quality_score, 1), "component_status": round(component_status_score, 1), "weighted_total": round(weighted_total, 1), } if missing_categories: breakdown["missing_categories"] = missing_categories breakdown["calculation_time_ms"] = round(calculation_time, 2) return breakdown
[docs] async def get_health_alerts(self, stats: ComprehensiveStats) -> list[HealthAlert]: """ Generate health alerts based on current statistics. Args: stats: Comprehensive statistics from all components Returns: List of health alerts with severity levels and recommendations """ alerts: list[HealthAlert] = [] # Check error rates error_alerts = await self._check_error_alerts(stats) alerts.extend(error_alerts) # Check performance metrics performance_alerts = await self._check_performance_alerts(stats) alerts.extend(performance_alerts) # Check connection stability connection_alerts = await self._check_connection_alerts(stats) alerts.extend(connection_alerts) # Check resource usage resource_alerts = await self._check_resource_alerts(stats) alerts.extend(resource_alerts) # Check data quality data_quality_alerts = await self._check_data_quality_alerts(stats) alerts.extend(data_quality_alerts) # Sort alerts by severity (critical first) severity_order = {"CRITICAL": 0, "DEGRADED": 1, "WARNING": 2, "HEALTHY": 3} alerts.sort(key=lambda x: severity_order.get(x["level"], 3)) return alerts
async def _score_errors(self, stats: ComprehensiveStats) -> float: """ Score error rates across all components. Returns: Error health score (0-100, higher is better) """ if not self._has_error_data(stats): return 100.0 # Assume healthy if no error data total_errors = 0 total_operations = 0 # Aggregate error counts from all components for _, component_stats in stats["suite"]["components"].items(): error_count = component_stats.get("error_count", 0) total_errors += error_count # Estimate total operations based on component type if "performance_metrics" in component_stats: perf_metrics = component_stats["performance_metrics"] for _, metrics in perf_metrics.items(): if isinstance(metrics, dict) and "count" in metrics: total_operations += metrics["count"] # Add API call statistics if available if "http_client" in stats: http_stats = stats["http_client"] total_operations += http_stats.get("total_requests", 0) total_errors += http_stats.get("failed_requests", 0) # Calculate error rate per 1000 operations if total_operations > 0: error_rate = (total_errors / total_operations) * 1000 else: error_rate = 0.0 # Score based on thresholds if error_rate <= self.thresholds.error_rate_excellent: return 100.0 elif error_rate <= self.thresholds.error_rate_good: # Linear interpolation between 100 and 80 ratio = (error_rate - self.thresholds.error_rate_excellent) / ( self.thresholds.error_rate_good - self.thresholds.error_rate_excellent ) return 100.0 - (ratio * 20.0) elif error_rate <= self.thresholds.error_rate_warning: # Linear interpolation between 80 and 40 ratio = (error_rate - self.thresholds.error_rate_good) / ( self.thresholds.error_rate_warning - self.thresholds.error_rate_good ) return 80.0 - (ratio * 40.0) elif error_rate <= self.thresholds.error_rate_critical: # Linear interpolation between 40 and 10 ratio = (error_rate - self.thresholds.error_rate_warning) / ( self.thresholds.error_rate_critical - self.thresholds.error_rate_warning ) return 40.0 - (ratio * 30.0) else: return 0.0 async def _score_performance(self, stats: ComprehensiveStats) -> float: """ Score performance metrics including response times and latency. Returns: Performance health score (0-100, higher is better) """ if not self._has_performance_data(stats): return 100.0 # Assume healthy if no performance data avg_response_time = stats["suite"].get("avg_response_time_ms", 0.0) # Also check component-level performance metrics total_response_time = avg_response_time metric_count = 1 if avg_response_time > 0 else 0 for component_stats in stats["suite"]["components"].values(): if "performance_metrics" in component_stats: perf_metrics = component_stats["performance_metrics"] for _, metrics in perf_metrics.items(): if isinstance(metrics, dict) and "avg_ms" in metrics: total_response_time += metrics["avg_ms"] metric_count += 1 if metric_count == 0: return 100.0 avg_performance_time = total_response_time / metric_count # Score based on thresholds if avg_performance_time <= self.thresholds.response_time_excellent: return 100.0 elif avg_performance_time <= self.thresholds.response_time_good: # Linear interpolation between 100 and 80 ratio = (avg_performance_time - self.thresholds.response_time_excellent) / ( self.thresholds.response_time_good - self.thresholds.response_time_excellent ) return 100.0 - (ratio * 20.0) elif avg_performance_time <= self.thresholds.response_time_warning: # Linear interpolation between 80 and 40 ratio = (avg_performance_time - self.thresholds.response_time_good) / ( self.thresholds.response_time_warning - self.thresholds.response_time_good ) return 80.0 - (ratio * 40.0) elif avg_performance_time <= self.thresholds.response_time_critical: # Linear interpolation between 40 and 10 ratio = (avg_performance_time - self.thresholds.response_time_warning) / ( self.thresholds.response_time_critical - self.thresholds.response_time_warning ) return 40.0 - (ratio * 30.0) else: return 0.0 async def _score_connection(self, stats: ComprehensiveStats) -> float: """ Score connection stability including WebSocket connections and reconnections. Returns: Connection health score (0-100, higher is better) """ if not self._has_connection_data(stats): return 100.0 # Assume healthy if no connection data # Check real-time connection status realtime_connected = stats["suite"].get("realtime_connected", False) user_hub_connected = stats["suite"].get("user_hub_connected", False) market_hub_connected = stats["suite"].get("market_hub_connected", False) # Base score from connection status connections_up = sum( [realtime_connected, user_hub_connected, market_hub_connected] ) connection_score = (connections_up / 3.0) * 50.0 # 50% for basic connectivity # Check reconnection rates if available reconnection_penalty = 0.0 if "realtime" in stats: realtime_stats = stats["realtime"] reconnection_attempts = realtime_stats.get("reconnection_attempts", 0) uptime_hours = realtime_stats.get("connection_uptime_seconds", 0) / 3600 if uptime_hours > 0: reconnections_per_hour = reconnection_attempts / uptime_hours if reconnections_per_hour <= self.thresholds.reconnection_excellent: reconnection_penalty = 0.0 elif reconnections_per_hour <= self.thresholds.reconnection_good: ratio = ( reconnections_per_hour - self.thresholds.reconnection_excellent ) / ( self.thresholds.reconnection_good - self.thresholds.reconnection_excellent ) reconnection_penalty = ratio * 10.0 elif reconnections_per_hour <= self.thresholds.reconnection_warning: ratio = ( reconnections_per_hour - self.thresholds.reconnection_good ) / ( self.thresholds.reconnection_warning - self.thresholds.reconnection_good ) reconnection_penalty = 10.0 + (ratio * 20.0) else: reconnection_penalty = 40.0 # Stability score (remaining 50%) stability_score = max(0.0, 50.0 - reconnection_penalty) return min(100.0, connection_score + stability_score) async def _score_resources(self, stats: ComprehensiveStats) -> float: """ Score resource usage including memory and API calls. Returns: Resource usage health score (0-100, higher is better) """ if not self._has_resource_data(stats): return 100.0 # Assume healthy if no resource data # Memory usage scoring (primary resource metric) memory_score = 100.0 if "memory" in stats: memory_stats = stats["memory"] memory_usage_percent = memory_stats.get("memory_usage_percent", 0.0) if memory_usage_percent <= self.thresholds.memory_usage_excellent: memory_score = 100.0 elif memory_usage_percent <= self.thresholds.memory_usage_good: ratio = ( memory_usage_percent - self.thresholds.memory_usage_excellent ) / ( self.thresholds.memory_usage_good - self.thresholds.memory_usage_excellent ) memory_score = 100.0 - (ratio * 20.0) elif memory_usage_percent <= self.thresholds.memory_usage_warning: ratio = (memory_usage_percent - self.thresholds.memory_usage_good) / ( self.thresholds.memory_usage_warning - self.thresholds.memory_usage_good ) memory_score = 80.0 - (ratio * 40.0) elif memory_usage_percent <= self.thresholds.memory_usage_critical: ratio = ( memory_usage_percent - self.thresholds.memory_usage_warning ) / ( self.thresholds.memory_usage_critical - self.thresholds.memory_usage_warning ) memory_score = 40.0 - (ratio * 30.0) else: memory_score = 0.0 # API call efficiency (secondary metric) api_efficiency_score = 100.0 cache_hit_rate = stats["suite"].get("cache_hit_rate", 1.0) if cache_hit_rate < 0.5: # Less than 50% cache hit rate api_efficiency_score = cache_hit_rate * 100.0 # Combine scores (memory 70%, API efficiency 30%) return (memory_score * 0.7) + (api_efficiency_score * 0.3) async def _score_data_quality(self, stats: ComprehensiveStats) -> float: """ Score data quality including validation errors and data gaps. Returns: Data quality health score (0-100, higher is better) """ if not self._has_data_quality_data(stats): return 100.0 # Assume healthy if no data quality data total_validation_errors = 0 total_data_points = 0 # Check data manager statistics if "data_manager" in stats: data_stats = stats["data_manager"] validation_errors = data_stats.get("data_validation_errors", 0) total_bars = data_stats.get("bars_processed", 0) total_ticks = data_stats.get("ticks_processed", 0) total_validation_errors += validation_errors total_data_points += total_bars + total_ticks # Check orderbook statistics if "orderbook" in stats: orderbook_stats = stats["orderbook"] invalid_updates = orderbook_stats.get("invalid_updates", 0) duplicate_updates = orderbook_stats.get("duplicate_updates", 0) total_trades = orderbook_stats.get("trades_processed", 0) total_validation_errors += invalid_updates + duplicate_updates total_data_points += total_trades # Calculate validation error rate per 1000 data points if total_data_points > 0: validation_error_rate = (total_validation_errors / total_data_points) * 1000 else: validation_error_rate = 0.0 # Score based on thresholds if validation_error_rate <= self.thresholds.validation_error_excellent: return 100.0 elif validation_error_rate <= self.thresholds.validation_error_good: ratio = ( validation_error_rate - self.thresholds.validation_error_excellent ) / ( self.thresholds.validation_error_good - self.thresholds.validation_error_excellent ) return 100.0 - (ratio * 20.0) elif validation_error_rate <= self.thresholds.validation_error_warning: ratio = (validation_error_rate - self.thresholds.validation_error_good) / ( self.thresholds.validation_error_warning - self.thresholds.validation_error_good ) return 80.0 - (ratio * 40.0) elif validation_error_rate <= self.thresholds.validation_error_critical: ratio = ( validation_error_rate - self.thresholds.validation_error_warning ) / ( self.thresholds.validation_error_critical - self.thresholds.validation_error_warning ) return 40.0 - (ratio * 30.0) else: return 0.0 async def _score_component_status(self, stats: ComprehensiveStats) -> float: """ Score component status (active, connected, etc.). Returns: Component status health score (0-100, higher is better) """ total_components = len(stats["suite"]["components"]) if total_components == 0: return 100.0 healthy_components = 0.0 for component_stats in stats["suite"]["components"].values(): status = component_stats.get("status", "unknown") if status in ["connected", "active"]: healthy_components += 1 elif status in ["initializing"]: healthy_components += 0.7 # Partial credit for initializing return (healthy_components / total_components) * 100.0 # Alert generation methods async def _check_error_alerts(self, stats: ComprehensiveStats) -> list[HealthAlert]: """Generate alerts for error rates.""" alerts: list[HealthAlert] = [] if not self._has_error_data(stats): return alerts # Calculate total error rate total_errors = sum( comp_stats.get("error_count", 0) for comp_stats in stats["suite"]["components"].values() ) total_operations = 0 for component_stats in stats["suite"]["components"].values(): if "performance_metrics" in component_stats: perf_metrics = component_stats["performance_metrics"] for _, metrics in perf_metrics.items(): if isinstance(metrics, dict) and "count" in metrics: total_operations += metrics["count"] if total_operations > 0: error_rate = (total_errors / total_operations) * 1000 if error_rate >= self.thresholds.error_rate_critical: alerts.append( { "level": AlertLevel.CRITICAL.value, "category": "errors", "message": f"Critical error rate detected: {error_rate:.1f} errors per 1000 operations", "metric": "error_rate", "current_value": error_rate, "threshold": self.thresholds.error_rate_critical, "recommendation": "Investigate error sources immediately and implement fixes", } ) elif error_rate >= self.thresholds.error_rate_warning: alerts.append( { "level": AlertLevel.DEGRADED.value, "category": "errors", "message": f"Elevated error rate: {error_rate:.1f} errors per 1000 operations", "metric": "error_rate", "current_value": error_rate, "threshold": self.thresholds.error_rate_warning, "recommendation": "Monitor error patterns and consider implementing error handling improvements", } ) elif error_rate >= self.thresholds.error_rate_good: alerts.append( { "level": AlertLevel.WARNING.value, "category": "errors", "message": f"Increased error rate: {error_rate:.1f} errors per 1000 operations", "metric": "error_rate", "current_value": error_rate, "threshold": self.thresholds.error_rate_good, "recommendation": "Review recent changes and monitor error trends", } ) return alerts async def _check_performance_alerts( self, stats: ComprehensiveStats ) -> list[HealthAlert]: """Generate alerts for performance metrics.""" alerts: list[HealthAlert] = [] if not self._has_performance_data(stats): return alerts avg_response_time = stats["suite"].get("avg_response_time_ms", 0.0) if avg_response_time >= self.thresholds.response_time_critical: alerts.append( { "level": AlertLevel.CRITICAL.value, "category": "performance", "message": f"Critical response time: {avg_response_time:.0f}ms average", "metric": "avg_response_time_ms", "current_value": avg_response_time, "threshold": self.thresholds.response_time_critical, "recommendation": "Investigate performance bottlenecks and optimize critical paths", } ) elif avg_response_time >= self.thresholds.response_time_warning: alerts.append( { "level": AlertLevel.DEGRADED.value, "category": "performance", "message": f"Slow response time: {avg_response_time:.0f}ms average", "metric": "avg_response_time_ms", "current_value": avg_response_time, "threshold": self.thresholds.response_time_warning, "recommendation": "Profile application performance and consider caching optimizations", } ) elif avg_response_time >= self.thresholds.response_time_good: alerts.append( { "level": AlertLevel.WARNING.value, "category": "performance", "message": f"Elevated response time: {avg_response_time:.0f}ms average", "metric": "avg_response_time_ms", "current_value": avg_response_time, "threshold": self.thresholds.response_time_good, "recommendation": "Monitor performance trends and review recent deployments", } ) return alerts async def _check_connection_alerts( self, stats: ComprehensiveStats ) -> list[HealthAlert]: """Generate alerts for connection stability.""" alerts: list[HealthAlert] = [] # Check basic connectivity realtime_connected = stats["suite"].get("realtime_connected", False) user_hub_connected = stats["suite"].get("user_hub_connected", False) market_hub_connected = stats["suite"].get("market_hub_connected", False) if not realtime_connected or not user_hub_connected or not market_hub_connected: disconnected_hubs = [] if not realtime_connected: disconnected_hubs.append("realtime") if not user_hub_connected: disconnected_hubs.append("user_hub") if not market_hub_connected: disconnected_hubs.append("market_hub") alerts.append( { "level": AlertLevel.CRITICAL.value, "category": "connection", "message": f"Connection failure: {', '.join(disconnected_hubs)} disconnected", "metric": "connection_status", "current_value": f"{len(disconnected_hubs)} disconnected", "threshold": 0, "recommendation": "Check network connectivity and authentication credentials", } ) # Check reconnection rates if "realtime" in stats: realtime_stats = stats["realtime"] reconnection_attempts = realtime_stats.get("reconnection_attempts", 0) uptime_hours = realtime_stats.get("connection_uptime_seconds", 0) / 3600 if uptime_hours > 0: reconnections_per_hour = reconnection_attempts / uptime_hours if reconnections_per_hour >= self.thresholds.reconnection_critical: alerts.append( { "level": AlertLevel.CRITICAL.value, "category": "connection", "message": f"Excessive reconnections: {reconnections_per_hour:.1f} per hour", "metric": "reconnections_per_hour", "current_value": reconnections_per_hour, "threshold": self.thresholds.reconnection_critical, "recommendation": "Investigate network stability and connection handling", } ) elif reconnections_per_hour >= self.thresholds.reconnection_warning: alerts.append( { "level": AlertLevel.DEGRADED.value, "category": "connection", "message": f"Frequent reconnections: {reconnections_per_hour:.1f} per hour", "metric": "reconnections_per_hour", "current_value": reconnections_per_hour, "threshold": self.thresholds.reconnection_warning, "recommendation": "Monitor network conditions and consider connection timeout adjustments", } ) return alerts async def _check_resource_alerts( self, stats: ComprehensiveStats ) -> list[HealthAlert]: """Generate alerts for resource usage.""" alerts: list[HealthAlert] = [] if "memory" in stats: memory_stats = stats["memory"] memory_usage_percent = memory_stats.get("memory_usage_percent", 0.0) if memory_usage_percent >= self.thresholds.memory_usage_critical: alerts.append( { "level": AlertLevel.CRITICAL.value, "category": "resources", "message": f"Critical memory usage: {memory_usage_percent:.1f}%", "metric": "memory_usage_percent", "current_value": memory_usage_percent, "threshold": self.thresholds.memory_usage_critical, "recommendation": "Immediately review memory leaks and restart if necessary", } ) elif memory_usage_percent >= self.thresholds.memory_usage_warning: alerts.append( { "level": AlertLevel.DEGRADED.value, "category": "resources", "message": f"High memory usage: {memory_usage_percent:.1f}%", "metric": "memory_usage_percent", "current_value": memory_usage_percent, "threshold": self.thresholds.memory_usage_warning, "recommendation": "Monitor memory trends and consider implementing cleanup routines", } ) elif memory_usage_percent >= self.thresholds.memory_usage_good: alerts.append( { "level": AlertLevel.WARNING.value, "category": "resources", "message": f"Elevated memory usage: {memory_usage_percent:.1f}%", "metric": "memory_usage_percent", "current_value": memory_usage_percent, "threshold": self.thresholds.memory_usage_good, "recommendation": "Review memory usage patterns and optimize data structures", } ) return alerts async def _check_data_quality_alerts( self, stats: ComprehensiveStats ) -> list[HealthAlert]: """Generate alerts for data quality issues.""" alerts: list[HealthAlert] = [] # Check data validation errors if "data_manager" in stats: data_stats = stats["data_manager"] validation_errors = data_stats.get("data_validation_errors", 0) total_data_points = data_stats.get("bars_processed", 0) + data_stats.get( "ticks_processed", 0 ) if total_data_points > 0: validation_error_rate = (validation_errors / total_data_points) * 1000 if validation_error_rate >= self.thresholds.validation_error_critical: alerts.append( { "level": AlertLevel.CRITICAL.value, "category": "data_quality", "message": f"Critical data validation error rate: {validation_error_rate:.1f} per 1000 data points", "metric": "validation_error_rate", "current_value": validation_error_rate, "threshold": self.thresholds.validation_error_critical, "recommendation": "Investigate data sources and validation logic immediately", } ) elif validation_error_rate >= self.thresholds.validation_error_warning: alerts.append( { "level": AlertLevel.DEGRADED.value, "category": "data_quality", "message": f"High data validation error rate: {validation_error_rate:.1f} per 1000 data points", "metric": "validation_error_rate", "current_value": validation_error_rate, "threshold": self.thresholds.validation_error_warning, "recommendation": "Review data validation rules and data source quality", } ) return alerts # Helper methods for checking data availability def _has_error_data(self, stats: ComprehensiveStats) -> bool: """Check if error data is available.""" return any( comp_stats.get("error_count", 0) > 0 or "performance_metrics" in comp_stats for comp_stats in stats["suite"]["components"].values() ) def _has_performance_data(self, stats: ComprehensiveStats) -> bool: """Check if performance data is available.""" return stats["suite"].get("avg_response_time_ms", 0.0) > 0 or any( "performance_metrics" in comp_stats for comp_stats in stats["suite"]["components"].values() ) def _has_connection_data(self, stats: ComprehensiveStats) -> bool: """Check if connection data is available.""" suite_data = stats["suite"] return ( "realtime_connected" in suite_data or "user_hub_connected" in suite_data # type: ignore[unreachable] or "market_hub_connected" in suite_data or "realtime" in stats ) def _has_resource_data(self, stats: ComprehensiveStats) -> bool: """Check if resource data is available.""" return ( "memory" in stats or stats["suite"].get("memory_usage_mb", 0.0) > 0 or "cache_hit_rate" in stats["suite"] ) def _has_data_quality_data(self, stats: ComprehensiveStats) -> bool: """Check if data quality data is available.""" return "data_manager" in stats or "orderbook" in stats # Cache management methods async def _get_cached_value(self, cache_key: str) -> Any | None: """Get cached value if not expired.""" async with self._lock: if cache_key in self._cache: value, timestamp = self._cache[cache_key] if time.time() - timestamp < self._cache_ttl: return value return None async def _set_cached_value(self, cache_key: str, value: Any) -> None: """Set cached value with current timestamp.""" async with self._lock: self._cache[cache_key] = (value, time.time())
__all__ = [ "HealthMonitor", "HealthThresholds", "HealthAlert", "HealthBreakdown", "AlertLevel", ]