diff --git a/scripts/health_checker.py b/scripts/health_checker.py new file mode 100644 index 0000000..27cf855 --- /dev/null +++ b/scripts/health_checker.py @@ -0,0 +1,82 @@ +""" +Health Checker - Optional feature to check channel URL health +""" + +import logging +from concurrent.futures import ThreadPoolExecutor, as_completed +from typing import Dict, List, Tuple +from urllib.request import Request, urlopen +from urllib.error import URLError, HTTPError + +class HealthChecker: + """Check channel URL health status.""" + + def __init__(self, config): + self.config = config + self.logger = logging.getLogger(__name__) + self.timeout = config.settings.get('health_check_timeout', 5) + + def check_channel_health(self, url: str) -> Tuple[bool, str]: + """Check if a channel URL is accessible.""" + try: + req = Request(url, headers={'User-Agent': 'IPTV-Checker/1.0'}) + + with urlopen(req, timeout=self.timeout) as response: + status_code = response.getcode() + if status_code == 200: + return True, f"OK ({status_code})" + else: + return False, f"HTTP {status_code}" + + except HTTPError as e: + return False, f"HTTP {e.code}" + except URLError as e: + return False, f"Connection error: {str(e.reason)}" + except Exception as e: + return False, f"Error: {str(e)}" + + def batch_health_check(self, channels: List[Dict]) -> Dict[str, Tuple[bool, str]]: + """Perform health checks on multiple channels concurrently.""" + if not self.config.settings.get('enable_health_check', False): + self.logger.info("Health check disabled in settings") + return {} + + self.logger.info("Starting batch health check...") + results = {} + max_workers = self.config.settings.get('max_workers', 4) + + def check_single(channel): + url = channel.get('Stream URL', '') + name = channel.get('Stream name', 'Unknown') + is_healthy, status = self.check_channel_health(url) + return name, (is_healthy, status) + + # Limit to first 100 channels for performance + channels_to_check = channels[:100] + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + future_to_channel = { + executor.submit(check_single, channel): channel + for channel in channels_to_check + } + + completed = 0 + for future in as_completed(future_to_channel): + try: + name, result = future.result() + results[name] = result + completed += 1 + + if completed % 10 == 0: + self.logger.info(f"Health check progress: {completed}/{len(channels_to_check)}") + + except Exception as e: + self.logger.warning(f"Health check failed: {e}") + + healthy_count = sum(1 for is_healthy, _ in results.values() if is_healthy) + total_checked = len(results) + success_rate = (healthy_count / total_checked * 100) if total_checked > 0 else 0 + + self.logger.info(f"Health check complete: {healthy_count}/{total_checked} channels healthy ({success_rate:.1f}%)") + + return results \ No newline at end of file