""" Health Checker - Optional feature to check channel URL health """ import logging from concurrent.futures import ThreadPoolExecutor, as_completed from typing import Dict, List, Tuple from urllib.request import Request, urlopen from urllib.error import URLError, HTTPError class HealthChecker: """Check channel URL health status.""" def __init__(self, config): self.config = config self.logger = logging.getLogger(__name__) self.timeout = config.settings.get('health_check_timeout', 5) def check_channel_health(self, url: str) -> Tuple[bool, str]: """Check if a channel URL is accessible.""" try: req = Request(url, headers={'User-Agent': 'IPTV-Checker/1.0'}) with urlopen(req, timeout=self.timeout) as response: status_code = response.getcode() if status_code == 200: return True, f"OK ({status_code})" else: return False, f"HTTP {status_code}" except HTTPError as e: return False, f"HTTP {e.code}" except URLError as e: return False, f"Connection error: {str(e.reason)}" except Exception as e: return False, f"Error: {str(e)}" def batch_health_check(self, channels: List[Dict]) -> Dict[str, Tuple[bool, str]]: """Perform health checks on multiple channels concurrently.""" if not self.config.settings.get('enable_health_check', False): self.logger.info("Health check disabled in settings") return {} self.logger.info("Starting batch health check...") results = {} max_workers = self.config.settings.get('max_workers', 4) def check_single(channel): url = channel.get('Stream URL', '') name = channel.get('Stream name', 'Unknown') is_healthy, status = self.check_channel_health(url) return name, (is_healthy, status) # Limit to first 100 channels for performance channels_to_check = channels[:100] with ThreadPoolExecutor(max_workers=max_workers) as executor: future_to_channel = { executor.submit(check_single, channel): channel for channel in channels_to_check } completed = 0 for future in as_completed(future_to_channel): try: name, result = future.result() results[name] = result completed += 1 if completed % 10 == 0: self.logger.info(f"Health check progress: {completed}/{len(channels_to_check)}") except Exception as e: self.logger.warning(f"Health check failed: {e}") healthy_count = sum(1 for is_healthy, _ in results.values() if is_healthy) total_checked = len(results) success_rate = (healthy_count / total_checked * 100) if total_checked > 0 else 0 self.logger.info(f"Health check complete: {healthy_count}/{total_checked} channels healthy ({success_rate:.1f}%)") return results