Add scripts/health_checker.py
Some checks failed
📺 Generate M3U Playlist / build (push) Has been cancelled
Some checks failed
📺 Generate M3U Playlist / build (push) Has been cancelled
This commit is contained in:
parent
2bd6b74bb7
commit
c6d5674ce2
1 changed files with 82 additions and 0 deletions
82
scripts/health_checker.py
Normal file
82
scripts/health_checker.py
Normal file
|
@ -0,0 +1,82 @@
|
|||
"""
|
||||
Health Checker - Optional feature to check channel URL health
|
||||
"""
|
||||
|
||||
import logging
|
||||
from concurrent.futures import ThreadPoolExecutor, as_completed
|
||||
from typing import Dict, List, Tuple
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import URLError, HTTPError
|
||||
|
||||
class HealthChecker:
|
||||
"""Check channel URL health status."""
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
self.logger = logging.getLogger(__name__)
|
||||
self.timeout = config.settings.get('health_check_timeout', 5)
|
||||
|
||||
def check_channel_health(self, url: str) -> Tuple[bool, str]:
|
||||
"""Check if a channel URL is accessible."""
|
||||
try:
|
||||
req = Request(url, headers={'User-Agent': 'IPTV-Checker/1.0'})
|
||||
|
||||
with urlopen(req, timeout=self.timeout) as response:
|
||||
status_code = response.getcode()
|
||||
if status_code == 200:
|
||||
return True, f"OK ({status_code})"
|
||||
else:
|
||||
return False, f"HTTP {status_code}"
|
||||
|
||||
except HTTPError as e:
|
||||
return False, f"HTTP {e.code}"
|
||||
except URLError as e:
|
||||
return False, f"Connection error: {str(e.reason)}"
|
||||
except Exception as e:
|
||||
return False, f"Error: {str(e)}"
|
||||
|
||||
def batch_health_check(self, channels: List[Dict]) -> Dict[str, Tuple[bool, str]]:
|
||||
"""Perform health checks on multiple channels concurrently."""
|
||||
if not self.config.settings.get('enable_health_check', False):
|
||||
self.logger.info("Health check disabled in settings")
|
||||
return {}
|
||||
|
||||
self.logger.info("Starting batch health check...")
|
||||
results = {}
|
||||
max_workers = self.config.settings.get('max_workers', 4)
|
||||
|
||||
def check_single(channel):
|
||||
url = channel.get('Stream URL', '')
|
||||
name = channel.get('Stream name', 'Unknown')
|
||||
is_healthy, status = self.check_channel_health(url)
|
||||
return name, (is_healthy, status)
|
||||
|
||||
# Limit to first 100 channels for performance
|
||||
channels_to_check = channels[:100]
|
||||
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
future_to_channel = {
|
||||
executor.submit(check_single, channel): channel
|
||||
for channel in channels_to_check
|
||||
}
|
||||
|
||||
completed = 0
|
||||
for future in as_completed(future_to_channel):
|
||||
try:
|
||||
name, result = future.result()
|
||||
results[name] = result
|
||||
completed += 1
|
||||
|
||||
if completed % 10 == 0:
|
||||
self.logger.info(f"Health check progress: {completed}/{len(channels_to_check)}")
|
||||
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Health check failed: {e}")
|
||||
|
||||
healthy_count = sum(1 for is_healthy, _ in results.values() if is_healthy)
|
||||
total_checked = len(results)
|
||||
success_rate = (healthy_count / total_checked * 100) if total_checked > 0 else 0
|
||||
|
||||
self.logger.info(f"Health check complete: {healthy_count}/{total_checked} channels healthy ({success_rate:.1f}%)")
|
||||
|
||||
return results
|
Loading…
Add table
Add a link
Reference in a new issue