my-private-iptv-m3u/scripts/health_checker.py

274 lines
10 KiB
Python
Raw Permalink Normal View History

2025-06-28 02:05:36 +02:00
#!/usr/bin/env python3
2025-06-27 23:31:27 +02:00
"""
2025-06-28 02:22:49 +02:00
Health Checker - Simple health checking without external dependencies
2025-06-27 23:31:27 +02:00
"""
import logging
2025-06-28 02:22:49 +02:00
import urllib.request
import urllib.error
import socket
import time
2025-06-28 02:16:03 +02:00
import concurrent.futures
from typing import Dict, List, Optional
2025-06-27 23:31:27 +02:00
2025-06-28 02:16:03 +02:00
class HealthChecker:
2025-06-28 02:22:49 +02:00
"""Simple health checker using only standard library."""
2025-06-27 23:31:27 +02:00
2025-06-28 02:16:03 +02:00
def __init__(self, config):
2025-06-27 23:31:27 +02:00
self.config = config
self.logger = logging.getLogger(__name__)
2025-06-28 02:16:03 +02:00
self.timeout = config.settings.get('health_check_timeout', 5)
self.max_workers = config.settings.get('max_workers', 4)
2025-06-28 02:05:36 +02:00
2025-06-28 02:22:49 +02:00
# Set default socket timeout
socket.setdefaulttimeout(self.timeout)
2025-06-28 02:16:03 +02:00
def check_single_url(self, url: str) -> Dict:
2025-06-28 02:22:49 +02:00
"""Check a single URL for accessibility using urllib."""
2025-06-28 02:16:03 +02:00
start_time = time.time()
2025-06-28 02:05:36 +02:00
2025-06-27 23:31:27 +02:00
try:
2025-06-28 02:22:49 +02:00
# Create request with proper headers
req = urllib.request.Request(
url,
2025-06-28 02:16:03 +02:00
headers={'User-Agent': 'IPTV-Health-Checker/1.0'}
)
2025-06-27 23:31:27 +02:00
2025-06-28 02:22:49 +02:00
# Try to open the URL
with urllib.request.urlopen(req, timeout=self.timeout) as response:
response_time = time.time() - start_time
status_code = response.getcode()
return {
'url': url,
'status': 'healthy' if status_code < 400 else 'unhealthy',
'status_code': status_code,
'response_time': round(response_time, 2),
'error': None
}
except urllib.error.HTTPError as e:
2025-06-28 02:16:03 +02:00
return {
'url': url,
2025-06-28 02:22:49 +02:00
'status': 'unhealthy',
'status_code': e.code,
'response_time': time.time() - start_time,
'error': f'HTTP {e.code}: {e.reason}'
2025-06-28 02:16:03 +02:00
}
2025-06-27 23:31:27 +02:00
2025-06-28 02:22:49 +02:00
except urllib.error.URLError as e:
2025-06-28 02:16:03 +02:00
return {
'url': url,
2025-06-28 02:22:49 +02:00
'status': 'unreachable',
2025-06-28 02:16:03 +02:00
'status_code': None,
2025-06-28 02:22:49 +02:00
'response_time': time.time() - start_time,
'error': f'URL Error: {e.reason}'
2025-06-28 02:16:03 +02:00
}
2025-06-28 02:05:36 +02:00
2025-06-28 02:22:49 +02:00
except socket.timeout:
2025-06-28 02:16:03 +02:00
return {
'url': url,
2025-06-28 02:22:49 +02:00
'status': 'timeout',
2025-06-28 02:16:03 +02:00
'status_code': None,
2025-06-28 02:22:49 +02:00
'response_time': self.timeout,
'error': 'Request timeout'
2025-06-28 02:16:03 +02:00
}
2025-06-28 02:05:36 +02:00
except Exception as e:
2025-06-28 02:16:03 +02:00
return {
'url': url,
'status': 'error',
'status_code': None,
'response_time': time.time() - start_time,
'error': str(e)
}
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
def check_channel_health(self, channel: Dict) -> Dict:
"""Check health of a single channel."""
url = channel.get('Stream URL', '')
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
if not url:
return {
'channel_name': channel.get('Stream name', 'Unknown'),
'url': '',
'status': 'no_url',
'status_code': None,
'response_time': 0,
'error': 'No URL provided'
}
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
result = self.check_single_url(url)
result['channel_name'] = channel.get('Stream name', 'Unknown')
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
return result
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
def batch_health_check(self, channels: List[Dict]) -> Dict:
"""Perform batch health check on multiple channels."""
if not self.config.settings.get('enable_health_check', False):
self.logger.info("Health checking is disabled")
2025-06-28 02:22:49 +02:00
return {
'enabled': False,
'results': [],
'summary': {
'total': len(channels),
'healthy': 0,
'health_percentage': 0,
'total_check_time': 0
}
}
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
self.logger.info(f"Starting health check for {len(channels)} channels...")
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
start_time = time.time()
results = []
2025-06-28 02:05:36 +02:00
2025-06-28 02:22:49 +02:00
# Limit concurrent checks to avoid overwhelming servers
max_workers = min(self.max_workers, len(channels))
if max_workers > 1:
# Use ThreadPoolExecutor for concurrent checks
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all health check tasks
future_to_channel = {
executor.submit(self.check_channel_health, channel): channel
for channel in channels
}
# Collect results as they complete
for future in concurrent.futures.as_completed(future_to_channel, timeout=60):
try:
result = future.result(timeout=10)
results.append(result)
except Exception as e:
channel = future_to_channel[future]
self.logger.warning(f"Health check failed for {channel.get('Stream name', 'Unknown')}: {e}")
results.append({
'channel_name': channel.get('Stream name', 'Unknown'),
'url': channel.get('Stream URL', ''),
'status': 'error',
'status_code': None,
'response_time': 0,
'error': str(e)
})
except Exception as e:
self.logger.error(f"Concurrent health check failed: {e}")
# Fall back to sequential processing
for channel in channels:
try:
result = self.check_channel_health(channel)
results.append(result)
except Exception as channel_error:
self.logger.warning(f"Channel check failed: {channel_error}")
else:
# Sequential processing for single worker
for channel in channels:
2025-06-28 02:16:03 +02:00
try:
2025-06-28 02:22:49 +02:00
result = self.check_channel_health(channel)
2025-06-28 02:16:03 +02:00
results.append(result)
except Exception as e:
2025-06-28 02:22:49 +02:00
self.logger.warning(f"Health check failed for {channel.get('Stream name', 'Unknown')}: {e}")
2025-06-28 02:16:03 +02:00
results.append({
'channel_name': channel.get('Stream name', 'Unknown'),
'url': channel.get('Stream URL', ''),
'status': 'error',
'status_code': None,
'response_time': 0,
'error': str(e)
})
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
total_time = time.time() - start_time
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
# Generate summary statistics
summary = self._generate_health_summary(results, total_time)
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
self.logger.info(f"Health check completed in {total_time:.1f}s: "
f"{summary['healthy']}/{summary['total']} channels healthy")
2025-06-28 02:05:36 +02:00
return {
2025-06-28 02:16:03 +02:00
'enabled': True,
'results': results,
'summary': summary,
'total_time': total_time
2025-06-28 02:05:36 +02:00
}
2025-06-28 02:16:03 +02:00
def _generate_health_summary(self, results: List[Dict], total_time: float) -> Dict:
"""Generate summary statistics from health check results."""
total = len(results)
healthy = sum(1 for r in results if r['status'] == 'healthy')
unhealthy = sum(1 for r in results if r['status'] == 'unhealthy')
timeout = sum(1 for r in results if r['status'] == 'timeout')
unreachable = sum(1 for r in results if r['status'] == 'unreachable')
errors = sum(1 for r in results if r['status'] == 'error')
no_url = sum(1 for r in results if r['status'] == 'no_url')
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
# Calculate average response time for successful checks
successful_times = [r['response_time'] for r in results if r['status'] == 'healthy']
avg_response_time = sum(successful_times) / len(successful_times) if successful_times else 0
2025-06-28 02:05:36 +02:00
return {
2025-06-28 02:16:03 +02:00
'total': total,
'healthy': healthy,
'unhealthy': unhealthy,
'timeout': timeout,
'unreachable': unreachable,
'errors': errors,
'no_url': no_url,
'health_percentage': round((healthy / total * 100) if total > 0 else 0, 1),
'avg_response_time': round(avg_response_time, 2),
'total_check_time': round(total_time, 1)
2025-06-28 02:05:36 +02:00
}
2025-06-28 02:16:03 +02:00
def get_unhealthy_channels(self, health_results: Dict) -> List[Dict]:
"""Get list of unhealthy channels for reporting."""
if not health_results.get('enabled', False):
return []
unhealthy = []
for result in health_results.get('results', []):
if result['status'] != 'healthy':
unhealthy.append({
'name': result['channel_name'],
'url': result['url'],
'status': result['status'],
'error': result.get('error', 'Unknown error')
2025-06-28 02:05:36 +02:00
})
2025-06-28 02:16:03 +02:00
return unhealthy
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
def save_health_report(self, health_results: Dict, filename: str = None) -> Optional[str]:
"""Save health check results to a file."""
if not health_results.get('enabled', False):
return None
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
import json
from datetime import datetime
from pathlib import Path
2025-06-28 02:05:36 +02:00
if filename is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
2025-06-28 02:16:03 +02:00
filename = f'health_check_{timestamp}.json'
2025-06-28 02:05:36 +02:00
2025-06-28 02:16:03 +02:00
reports_dir = Path('reports')
2025-06-28 02:05:36 +02:00
reports_dir.mkdir(exist_ok=True)
report_path = reports_dir / filename
try:
2025-06-28 02:16:03 +02:00
# Prepare report data
report_data = {
'timestamp': datetime.now().isoformat(),
'summary': health_results['summary'],
'unhealthy_channels': self.get_unhealthy_channels(health_results),
'total_time': health_results['total_time']
}
2025-06-28 02:05:36 +02:00
with open(report_path, 'w', encoding='utf-8') as f:
2025-06-28 02:16:03 +02:00
json.dump(report_data, f, indent=2)
2025-06-28 02:05:36 +02:00
self.logger.info(f"Health report saved to: {report_path}")
2025-06-28 02:16:03 +02:00
return str(report_path)
2025-06-28 02:05:36 +02:00
except Exception as e:
self.logger.error(f"Could not save health report: {e}")
2025-06-28 02:22:49 +02:00
return None