Update scripts/health_checker.py
Some checks failed
Generate M3U Playlist / build (push) Failing after 1m9s

This commit is contained in:
stoney420 2025-06-28 02:05:36 +02:00
parent c3f21dbce8
commit 7293e40ea2

View file

@ -1,82 +1,597 @@
#!/usr/bin/env python3
"""
Health Checker - Optional feature to check channel URL health
Repository Health Monitor - Keeps the repository clean and organized
"""
import os
import shutil
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
import json
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, List, Tuple
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError
import subprocess
import gzip
class HealthChecker:
"""Check channel URL health status."""
class RepoHealthMonitor:
"""Monitor and maintain repository cleanliness and organization."""
def __init__(self, config):
def __init__(self, config=None):
self.config = config
self.logger = logging.getLogger(__name__)
self.timeout = config.settings.get('health_check_timeout', 5)
def check_channel_health(self, url: str) -> Tuple[bool, str]:
"""Check if a channel URL is accessible."""
try:
req = Request(url, headers={'User-Agent': 'IPTV-Checker/1.0'})
with urlopen(req, timeout=self.timeout) as response:
status_code = response.getcode()
if status_code == 200:
return True, f"OK ({status_code})"
else:
return False, f"HTTP {status_code}"
except HTTPError as e:
return False, f"HTTP {e.code}"
except URLError as e:
return False, f"Connection error: {str(e.reason)}"
except Exception as e:
return False, f"Error: {str(e)}"
def batch_health_check(self, channels: List[Dict]) -> Dict[str, Tuple[bool, str]]:
"""Perform health checks on multiple channels concurrently."""
if not self.config.settings.get('enable_health_check', False):
self.logger.info("Health check disabled in settings")
return {}
self.root_path = Path.cwd()
self.logger.info("Starting batch health check...")
results = {}
max_workers = self.config.settings.get('max_workers', 4)
def check_single(channel):
url = channel.get('Stream URL', '')
name = channel.get('Stream name', 'Unknown')
is_healthy, status = self.check_channel_health(url)
return name, (is_healthy, status)
# Limit to first 100 channels for performance
channels_to_check = channels[:100]
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_channel = {
executor.submit(check_single, channel): channel
for channel in channels_to_check
# Define cleanup rules
self.cleanup_rules = {
'temp_files': {
'patterns': ['*_temp*', '*.tmp', '*~', '*.backup.*'],
'max_age_days': 1,
'action': 'delete'
},
'old_logs': {
'patterns': ['*.log'],
'max_age_days': 7,
'action': 'archive',
'keep_recent': 5
},
'old_backups': {
'patterns': ['backups/*.txt'],
'max_age_days': 30,
'action': 'compress'
},
'large_files': {
'max_size_mb': 50,
'action': 'warn'
},
'python_cache': {
'patterns': ['__pycache__', '*.pyc', '*.pyo'],
'action': 'delete'
}
}
def run_health_check(self) -> Dict:
"""Run comprehensive repository health check."""
self.logger.info("🔍 Starting repository health check...")
health_report = {
'timestamp': datetime.now().isoformat(),
'repository_size': self._calculate_repo_size(),
'file_counts': self._count_files_by_type(),
'issues_found': [],
'cleanup_suggestions': [],
'space_analysis': self._analyze_disk_usage(),
'organization_score': 0
}
# Check various aspects
health_report.update({
'temp_files': self._check_temp_files(),
'log_files': self._check_log_files(),
'backup_files': self._check_backup_files(),
'large_files': self._check_large_files(),
'python_artifacts': self._check_python_artifacts(),
'git_status': self._check_git_status()
})
# Calculate organization score
health_report['organization_score'] = self._calculate_organization_score(health_report)
# Generate suggestions
health_report['cleanup_suggestions'] = self._generate_cleanup_suggestions(health_report)
self.logger.info(f"📊 Health check complete. Organization score: {health_report['organization_score']}/100")
return health_report
def auto_cleanup(self, dry_run: bool = False) -> Dict:
"""Automatically clean up repository based on rules."""
self.logger.info(f"🧹 Starting auto-cleanup (dry_run={dry_run})...")
cleanup_results = {
'files_deleted': [],
'files_archived': [],
'files_compressed': [],
'space_freed_mb': 0,
'errors': []
}
try:
# Clean temp files
cleanup_results.update(self._cleanup_temp_files(dry_run))
completed = 0
for future in as_completed(future_to_channel):
# Archive old logs
cleanup_results.update(self._archive_old_logs(dry_run))
# Compress old backups
cleanup_results.update(self._compress_old_backups(dry_run))
# Remove Python cache
cleanup_results.update(self._cleanup_python_cache(dry_run))
# Organize files
cleanup_results.update(self._organize_files(dry_run))
except Exception as e:
self.logger.error(f"Error during auto-cleanup: {e}")
cleanup_results['errors'].append(str(e))
self.logger.info(f"✅ Auto-cleanup complete. Space freed: {cleanup_results['space_freed_mb']:.2f} MB")
return cleanup_results
def _calculate_repo_size(self) -> Dict:
"""Calculate repository size breakdown."""
sizes = {
'total_mb': 0,
'by_directory': {},
'by_extension': {}
}
for root, dirs, files in os.walk(self.root_path):
# Skip .git directory
if '.git' in root:
continue
dir_size = 0
for file in files:
file_path = Path(root) / file
try:
name, result = future.result()
results[name] = result
completed += 1
file_size = file_path.stat().st_size
dir_size += file_size
if completed % 10 == 0:
self.logger.info(f"Health check progress: {completed}/{len(channels_to_check)}")
# Track by extension
ext = file_path.suffix.lower()
if ext:
sizes['by_extension'][ext] = sizes['by_extension'].get(ext, 0) + file_size
except (OSError, FileNotFoundError):
continue
if dir_size > 0:
rel_dir = str(Path(root).relative_to(self.root_path))
sizes['by_directory'][rel_dir] = dir_size / (1024 * 1024) # Convert to MB
sizes['total_mb'] += dir_size / (1024 * 1024)
return sizes
def _count_files_by_type(self) -> Dict:
"""Count files by type and directory."""
counts = {
'total_files': 0,
'by_extension': {},
'by_directory': {}
}
for root, dirs, files in os.walk(self.root_path):
if '.git' in root:
continue
rel_dir = str(Path(root).relative_to(self.root_path))
counts['by_directory'][rel_dir] = len(files)
counts['total_files'] += len(files)
for file in files:
ext = Path(file).suffix.lower()
if ext:
counts['by_extension'][ext] = counts['by_extension'].get(ext, 0) + 1
return counts
def _check_temp_files(self) -> Dict:
"""Check for temporary files that should be cleaned."""
temp_files = []
for pattern in self.cleanup_rules['temp_files']['patterns']:
for file_path in self.root_path.rglob(pattern):
if file_path.is_file() and '.git' not in str(file_path):
age_days = (datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)).days
temp_files.append({
'path': str(file_path.relative_to(self.root_path)),
'size_mb': file_path.stat().st_size / (1024 * 1024),
'age_days': age_days
})
return {
'count': len(temp_files),
'files': temp_files,
'total_size_mb': sum(f['size_mb'] for f in temp_files)
}
def _check_log_files(self) -> Dict:
"""Check log file status and organization."""
log_files = []
reports_dir = self.root_path / 'reports'
# Check root log files
for log_file in self.root_path.glob('*.log'):
age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days
log_files.append({
'path': str(log_file.relative_to(self.root_path)),
'size_mb': log_file.stat().st_size / (1024 * 1024),
'age_days': age_days,
'location': 'root',
'should_move': True
})
# Check reports directory
if reports_dir.exists():
for log_file in reports_dir.rglob('*.log'):
age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days
log_files.append({
'path': str(log_file.relative_to(self.root_path)),
'size_mb': log_file.stat().st_size / (1024 * 1024),
'age_days': age_days,
'location': 'reports',
'should_move': False
})
return {
'count': len(log_files),
'files': log_files,
'misplaced_count': sum(1 for f in log_files if f['should_move']),
'total_size_mb': sum(f['size_mb'] for f in log_files)
}
def _check_backup_files(self) -> Dict:
"""Check backup file organization and compression opportunities."""
backups = []
backup_dir = self.root_path / 'backups'
if backup_dir.exists():
for backup_file in backup_dir.rglob('*'):
if backup_file.is_file():
age_days = (datetime.now() - datetime.fromtimestamp(backup_file.stat().st_mtime)).days
is_compressed = backup_file.suffix in ['.gz', '.zip', '.tar.gz']
backups.append({
'path': str(backup_file.relative_to(self.root_path)),
'size_mb': backup_file.stat().st_size / (1024 * 1024),
'age_days': age_days,
'is_compressed': is_compressed,
'should_compress': age_days > 7 and not is_compressed
})
return {
'count': len(backups),
'files': backups,
'compression_candidates': sum(1 for b in backups if b['should_compress']),
'total_size_mb': sum(b['size_mb'] for b in backups)
}
def _check_large_files(self) -> Dict:
"""Check for unusually large files."""
large_files = []
max_size_bytes = self.cleanup_rules['large_files']['max_size_mb'] * 1024 * 1024
for root, dirs, files in os.walk(self.root_path):
if '.git' in root:
continue
for file in files:
file_path = Path(root) / file
try:
if file_path.stat().st_size > max_size_bytes:
large_files.append({
'path': str(file_path.relative_to(self.root_path)),
'size_mb': file_path.stat().st_size / (1024 * 1024),
'type': file_path.suffix.lower()
})
except (OSError, FileNotFoundError):
continue
return {
'count': len(large_files),
'files': large_files,
'total_size_mb': sum(f['size_mb'] for f in large_files)
}
def _check_python_artifacts(self) -> Dict:
"""Check for Python cache and compiled files."""
artifacts = []
# Find __pycache__ directories
for pycache_dir in self.root_path.rglob('__pycache__'):
if pycache_dir.is_dir():
size = sum(f.stat().st_size for f in pycache_dir.rglob('*') if f.is_file())
artifacts.append({
'path': str(pycache_dir.relative_to(self.root_path)),
'type': 'directory',
'size_mb': size / (1024 * 1024)
})
# Find .pyc and .pyo files
for pyc_file in self.root_path.rglob('*.py[co]'):
artifacts.append({
'path': str(pyc_file.relative_to(self.root_path)),
'type': 'file',
'size_mb': pyc_file.stat().st_size / (1024 * 1024)
})
return {
'count': len(artifacts),
'files': artifacts,
'total_size_mb': sum(a['size_mb'] for a in artifacts)
}
def _check_git_status(self) -> Dict:
"""Check git repository status."""
try:
# Check for untracked files
result = subprocess.run(['git', 'status', '--porcelain'],
capture_output=True, text=True, cwd=self.root_path)
untracked = []
modified = []
for line in result.stdout.strip().split('\n'):
if line:
status, filename = line[:2], line[3:]
if status.strip() == '??':
untracked.append(filename)
elif status.strip():
modified.append(filename)
return {
'untracked_files': untracked,
'modified_files': modified,
'is_clean': len(untracked) == 0 and len(modified) == 0
}
except subprocess.CalledProcessError:
return {'error': 'Not a git repository or git not available'}
def _calculate_organization_score(self, health_report: Dict) -> int:
"""Calculate a repository organization score (0-100)."""
score = 100
# Deduct points for issues
if health_report['temp_files']['count'] > 0:
score -= min(20, health_report['temp_files']['count'] * 2)
if health_report['log_files']['misplaced_count'] > 0:
score -= min(15, health_report['log_files']['misplaced_count'] * 5)
if health_report['backup_files']['compression_candidates'] > 0:
score -= min(10, health_report['backup_files']['compression_candidates'] * 3)
if health_report['python_artifacts']['count'] > 0:
score -= min(10, health_report['python_artifacts']['count'])
if health_report['large_files']['count'] > 0:
score -= min(15, health_report['large_files']['count'] * 5)
# Check git status
git_status = health_report.get('git_status', {})
if not git_status.get('is_clean', True):
score -= 10
return max(0, score)
def _generate_cleanup_suggestions(self, health_report: Dict) -> List[str]:
"""Generate specific cleanup suggestions based on health report."""
suggestions = []
if health_report['temp_files']['count'] > 0:
suggestions.append(f"🗑️ Remove {health_report['temp_files']['count']} temporary files ({health_report['temp_files']['total_size_mb']:.1f} MB)")
if health_report['log_files']['misplaced_count'] > 0:
suggestions.append(f"📁 Move {health_report['log_files']['misplaced_count']} log files to reports/ directory")
if health_report['backup_files']['compression_candidates'] > 0:
suggestions.append(f"🗜️ Compress {health_report['backup_files']['compression_candidates']} old backup files")
if health_report['python_artifacts']['count'] > 0:
suggestions.append(f"🐍 Remove Python cache artifacts ({health_report['python_artifacts']['total_size_mb']:.1f} MB)")
if health_report['large_files']['count'] > 0:
suggestions.append(f"📏 Review {health_report['large_files']['count']} large files for archival")
git_status = health_report.get('git_status', {})
if git_status.get('untracked_files'):
suggestions.append(f"📝 Add {len(git_status['untracked_files'])} untracked files to .gitignore or commit them")
return suggestions
def _analyze_disk_usage(self) -> Dict:
"""Analyze disk usage patterns."""
try:
total, used, free = shutil.disk_usage(self.root_path)
return {
'total_gb': total / (1024**3),
'used_gb': used / (1024**3),
'free_gb': free / (1024**3),
'usage_percent': (used / total) * 100
}
except Exception as e:
return {'error': str(e)}
def _cleanup_temp_files(self, dry_run: bool) -> Dict:
"""Clean up temporary files."""
results = {'temp_files_deleted': []}
for pattern in self.cleanup_rules['temp_files']['patterns']:
for file_path in self.root_path.rglob(pattern):
if file_path.is_file() and '.git' not in str(file_path):
if not dry_run:
try:
file_path.unlink()
results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path)))
except Exception as e:
self.logger.warning(f"Could not delete {file_path}: {e}")
else:
results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path)))
return results
def _archive_old_logs(self, dry_run: bool) -> Dict:
"""Archive old log files."""
results = {'logs_archived': []}
# Create reports/logs directory if it doesn't exist
logs_dir = self.root_path / 'reports' / 'logs'
if not dry_run:
logs_dir.mkdir(parents=True, exist_ok=True)
# Move log files from root to reports/logs
for log_file in self.root_path.glob('*.log'):
new_path = logs_dir / log_file.name
if not dry_run:
try:
shutil.move(str(log_file), str(new_path))
results['logs_archived'].append(str(log_file.relative_to(self.root_path)))
except Exception as e:
self.logger.warning(f"Health check failed: {e}")
self.logger.warning(f"Could not move {log_file}: {e}")
else:
results['logs_archived'].append(str(log_file.relative_to(self.root_path)))
healthy_count = sum(1 for is_healthy, _ in results.values() if is_healthy)
total_checked = len(results)
success_rate = (healthy_count / total_checked * 100) if total_checked > 0 else 0
return results
def _compress_old_backups(self, dry_run: bool) -> Dict:
"""Compress old backup files."""
results = {'backups_compressed': []}
backup_dir = self.root_path / 'backups'
self.logger.info(f"Health check complete: {healthy_count}/{total_checked} channels healthy ({success_rate:.1f}%)")
if backup_dir.exists():
cutoff_date = datetime.now() - timedelta(days=7)
for backup_file in backup_dir.glob('*.txt'):
file_date = datetime.fromtimestamp(backup_file.stat().st_mtime)
if file_date < cutoff_date:
if not dry_run:
try:
# Compress with gzip
with open(backup_file, 'rb') as f_in:
with gzip.open(f"{backup_file}.gz", 'wb') as f_out:
shutil.copyfileobj(f_in, f_out)
backup_file.unlink()
results['backups_compressed'].append(str(backup_file.relative_to(self.root_path)))
except Exception as e:
self.logger.warning(f"Could not compress {backup_file}: {e}")
else:
results['backups_compressed'].append(str(backup_file.relative_to(self.root_path)))
return results
return results
def _cleanup_python_cache(self, dry_run: bool) -> Dict:
"""Remove Python cache files and directories."""
results = {'python_cache_removed': []}
# Remove __pycache__ directories
for pycache_dir in self.root_path.rglob('__pycache__'):
if pycache_dir.is_dir():
if not dry_run:
try:
shutil.rmtree(pycache_dir)
results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path)))
except Exception as e:
self.logger.warning(f"Could not remove {pycache_dir}: {e}")
else:
results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path)))
# Remove .pyc and .pyo files
for pyc_file in self.root_path.rglob('*.py[co]'):
if not dry_run:
try:
pyc_file.unlink()
results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path)))
except Exception as e:
self.logger.warning(f"Could not remove {pyc_file}: {e}")
else:
results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path)))
return results
def _organize_files(self, dry_run: bool) -> Dict:
"""Organize files into proper directories."""
results = {'files_organized': []}
# Create proper directory structure
directories = [
'reports/logs',
'reports/archive',
'backups/compressed',
'templates'
]
if not dry_run:
for directory in directories:
(self.root_path / directory).mkdir(parents=True, exist_ok=True)
return results
def save_health_report(self, health_report: Dict, filename: str = None) -> Path:
"""Save health report to file."""
if filename is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
filename = f'repo_health_{timestamp}.json'
reports_dir = self.root_path / 'reports'
reports_dir.mkdir(exist_ok=True)
report_path = reports_dir / filename
try:
with open(report_path, 'w', encoding='utf-8') as f:
json.dump(health_report, f, indent=2, default=str)
self.logger.info(f"Health report saved to: {report_path}")
return report_path
except Exception as e:
self.logger.error(f"Could not save health report: {e}")
return None
def main():
"""Command line interface for repository health monitoring."""
import argparse
parser = argparse.ArgumentParser(description='IPTV Repository Health Monitor')
parser.add_argument('--check', action='store_true', help='Run health check')
parser.add_argument('--cleanup', action='store_true', help='Run auto cleanup')
parser.add_argument('--dry-run', action='store_true', help='Dry run (no actual changes)')
parser.add_argument('--save-report', action='store_true', help='Save health report to file')
args = parser.parse_args()
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='[%(asctime)s] %(levelname)s: %(message)s'
)
monitor = RepoHealthMonitor()
if args.check or args.save_report:
health_report = monitor.run_health_check()
print(f"\n📊 Repository Health Report")
print(f"Organization Score: {health_report['organization_score']}/100")
print(f"Total Size: {health_report['repository_size']['total_mb']:.1f} MB")
print(f"Total Files: {health_report['file_counts']['total_files']}")
if health_report['cleanup_suggestions']:
print("\n🔧 Cleanup Suggestions:")
for suggestion in health_report['cleanup_suggestions']:
print(f" {suggestion}")
else:
print("\n✅ Repository is well organized!")
if args.save_report:
monitor.save_health_report(health_report)
if args.cleanup:
cleanup_results = monitor.auto_cleanup(dry_run=args.dry_run)
if args.dry_run:
print("\n🧪 Dry Run Results:")
else:
print("\n🧹 Cleanup Results:")
for key, items in cleanup_results.items():
if isinstance(items, list) and items:
print(f" {key}: {len(items)} items")
for item in items[:5]: # Show first 5
print(f" - {item}")
if len(items) > 5:
print(f" ... and {len(items) - 5} more")
if __name__ == "__main__":
main()