All checks were successful
Generate M3U Playlist / build (push) Successful in 2m7s
274 lines
No EOL
10 KiB
Python
274 lines
No EOL
10 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Health Checker - Simple health checking without external dependencies
|
|
"""
|
|
|
|
import logging
|
|
import urllib.request
|
|
import urllib.error
|
|
import socket
|
|
import time
|
|
import concurrent.futures
|
|
from typing import Dict, List, Optional
|
|
|
|
class HealthChecker:
|
|
"""Simple health checker using only standard library."""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.logger = logging.getLogger(__name__)
|
|
self.timeout = config.settings.get('health_check_timeout', 5)
|
|
self.max_workers = config.settings.get('max_workers', 4)
|
|
|
|
# Set default socket timeout
|
|
socket.setdefaulttimeout(self.timeout)
|
|
|
|
def check_single_url(self, url: str) -> Dict:
|
|
"""Check a single URL for accessibility using urllib."""
|
|
start_time = time.time()
|
|
|
|
try:
|
|
# Create request with proper headers
|
|
req = urllib.request.Request(
|
|
url,
|
|
headers={'User-Agent': 'IPTV-Health-Checker/1.0'}
|
|
)
|
|
|
|
# Try to open the URL
|
|
with urllib.request.urlopen(req, timeout=self.timeout) as response:
|
|
response_time = time.time() - start_time
|
|
status_code = response.getcode()
|
|
|
|
return {
|
|
'url': url,
|
|
'status': 'healthy' if status_code < 400 else 'unhealthy',
|
|
'status_code': status_code,
|
|
'response_time': round(response_time, 2),
|
|
'error': None
|
|
}
|
|
|
|
except urllib.error.HTTPError as e:
|
|
return {
|
|
'url': url,
|
|
'status': 'unhealthy',
|
|
'status_code': e.code,
|
|
'response_time': time.time() - start_time,
|
|
'error': f'HTTP {e.code}: {e.reason}'
|
|
}
|
|
|
|
except urllib.error.URLError as e:
|
|
return {
|
|
'url': url,
|
|
'status': 'unreachable',
|
|
'status_code': None,
|
|
'response_time': time.time() - start_time,
|
|
'error': f'URL Error: {e.reason}'
|
|
}
|
|
|
|
except socket.timeout:
|
|
return {
|
|
'url': url,
|
|
'status': 'timeout',
|
|
'status_code': None,
|
|
'response_time': self.timeout,
|
|
'error': 'Request timeout'
|
|
}
|
|
|
|
except Exception as e:
|
|
return {
|
|
'url': url,
|
|
'status': 'error',
|
|
'status_code': None,
|
|
'response_time': time.time() - start_time,
|
|
'error': str(e)
|
|
}
|
|
|
|
def check_channel_health(self, channel: Dict) -> Dict:
|
|
"""Check health of a single channel."""
|
|
url = channel.get('Stream URL', '')
|
|
|
|
if not url:
|
|
return {
|
|
'channel_name': channel.get('Stream name', 'Unknown'),
|
|
'url': '',
|
|
'status': 'no_url',
|
|
'status_code': None,
|
|
'response_time': 0,
|
|
'error': 'No URL provided'
|
|
}
|
|
|
|
result = self.check_single_url(url)
|
|
result['channel_name'] = channel.get('Stream name', 'Unknown')
|
|
|
|
return result
|
|
|
|
def batch_health_check(self, channels: List[Dict]) -> Dict:
|
|
"""Perform batch health check on multiple channels."""
|
|
if not self.config.settings.get('enable_health_check', False):
|
|
self.logger.info("Health checking is disabled")
|
|
return {
|
|
'enabled': False,
|
|
'results': [],
|
|
'summary': {
|
|
'total': len(channels),
|
|
'healthy': 0,
|
|
'health_percentage': 0,
|
|
'total_check_time': 0
|
|
}
|
|
}
|
|
|
|
self.logger.info(f"Starting health check for {len(channels)} channels...")
|
|
|
|
start_time = time.time()
|
|
results = []
|
|
|
|
# Limit concurrent checks to avoid overwhelming servers
|
|
max_workers = min(self.max_workers, len(channels))
|
|
|
|
if max_workers > 1:
|
|
# Use ThreadPoolExecutor for concurrent checks
|
|
try:
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all health check tasks
|
|
future_to_channel = {
|
|
executor.submit(self.check_channel_health, channel): channel
|
|
for channel in channels
|
|
}
|
|
|
|
# Collect results as they complete
|
|
for future in concurrent.futures.as_completed(future_to_channel, timeout=60):
|
|
try:
|
|
result = future.result(timeout=10)
|
|
results.append(result)
|
|
except Exception as e:
|
|
channel = future_to_channel[future]
|
|
self.logger.warning(f"Health check failed for {channel.get('Stream name', 'Unknown')}: {e}")
|
|
results.append({
|
|
'channel_name': channel.get('Stream name', 'Unknown'),
|
|
'url': channel.get('Stream URL', ''),
|
|
'status': 'error',
|
|
'status_code': None,
|
|
'response_time': 0,
|
|
'error': str(e)
|
|
})
|
|
except Exception as e:
|
|
self.logger.error(f"Concurrent health check failed: {e}")
|
|
# Fall back to sequential processing
|
|
for channel in channels:
|
|
try:
|
|
result = self.check_channel_health(channel)
|
|
results.append(result)
|
|
except Exception as channel_error:
|
|
self.logger.warning(f"Channel check failed: {channel_error}")
|
|
else:
|
|
# Sequential processing for single worker
|
|
for channel in channels:
|
|
try:
|
|
result = self.check_channel_health(channel)
|
|
results.append(result)
|
|
except Exception as e:
|
|
self.logger.warning(f"Health check failed for {channel.get('Stream name', 'Unknown')}: {e}")
|
|
results.append({
|
|
'channel_name': channel.get('Stream name', 'Unknown'),
|
|
'url': channel.get('Stream URL', ''),
|
|
'status': 'error',
|
|
'status_code': None,
|
|
'response_time': 0,
|
|
'error': str(e)
|
|
})
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Generate summary statistics
|
|
summary = self._generate_health_summary(results, total_time)
|
|
|
|
self.logger.info(f"Health check completed in {total_time:.1f}s: "
|
|
f"{summary['healthy']}/{summary['total']} channels healthy")
|
|
|
|
return {
|
|
'enabled': True,
|
|
'results': results,
|
|
'summary': summary,
|
|
'total_time': total_time
|
|
}
|
|
|
|
def _generate_health_summary(self, results: List[Dict], total_time: float) -> Dict:
|
|
"""Generate summary statistics from health check results."""
|
|
total = len(results)
|
|
healthy = sum(1 for r in results if r['status'] == 'healthy')
|
|
unhealthy = sum(1 for r in results if r['status'] == 'unhealthy')
|
|
timeout = sum(1 for r in results if r['status'] == 'timeout')
|
|
unreachable = sum(1 for r in results if r['status'] == 'unreachable')
|
|
errors = sum(1 for r in results if r['status'] == 'error')
|
|
no_url = sum(1 for r in results if r['status'] == 'no_url')
|
|
|
|
# Calculate average response time for successful checks
|
|
successful_times = [r['response_time'] for r in results if r['status'] == 'healthy']
|
|
avg_response_time = sum(successful_times) / len(successful_times) if successful_times else 0
|
|
|
|
return {
|
|
'total': total,
|
|
'healthy': healthy,
|
|
'unhealthy': unhealthy,
|
|
'timeout': timeout,
|
|
'unreachable': unreachable,
|
|
'errors': errors,
|
|
'no_url': no_url,
|
|
'health_percentage': round((healthy / total * 100) if total > 0 else 0, 1),
|
|
'avg_response_time': round(avg_response_time, 2),
|
|
'total_check_time': round(total_time, 1)
|
|
}
|
|
|
|
def get_unhealthy_channels(self, health_results: Dict) -> List[Dict]:
|
|
"""Get list of unhealthy channels for reporting."""
|
|
if not health_results.get('enabled', False):
|
|
return []
|
|
|
|
unhealthy = []
|
|
for result in health_results.get('results', []):
|
|
if result['status'] != 'healthy':
|
|
unhealthy.append({
|
|
'name': result['channel_name'],
|
|
'url': result['url'],
|
|
'status': result['status'],
|
|
'error': result.get('error', 'Unknown error')
|
|
})
|
|
|
|
return unhealthy
|
|
|
|
def save_health_report(self, health_results: Dict, filename: str = None) -> Optional[str]:
|
|
"""Save health check results to a file."""
|
|
if not health_results.get('enabled', False):
|
|
return None
|
|
|
|
import json
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
|
|
if filename is None:
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
filename = f'health_check_{timestamp}.json'
|
|
|
|
reports_dir = Path('reports')
|
|
reports_dir.mkdir(exist_ok=True)
|
|
|
|
report_path = reports_dir / filename
|
|
|
|
try:
|
|
# Prepare report data
|
|
report_data = {
|
|
'timestamp': datetime.now().isoformat(),
|
|
'summary': health_results['summary'],
|
|
'unhealthy_channels': self.get_unhealthy_channels(health_results),
|
|
'total_time': health_results['total_time']
|
|
}
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
json.dump(report_data, f, indent=2)
|
|
|
|
self.logger.info(f"Health report saved to: {report_path}")
|
|
return str(report_path)
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Could not save health report: {e}")
|
|
return None |