From 7293e40ea2a1026b581fd1bc5364ee5407201b99 Mon Sep 17 00:00:00 2001 From: stoney420 Date: Sat, 28 Jun 2025 02:05:36 +0200 Subject: [PATCH] Update scripts/health_checker.py --- scripts/health_checker.py | 641 ++++++++++++++++++++++++++++++++++---- 1 file changed, 578 insertions(+), 63 deletions(-) diff --git a/scripts/health_checker.py b/scripts/health_checker.py index 27cf855..8e9713b 100644 --- a/scripts/health_checker.py +++ b/scripts/health_checker.py @@ -1,82 +1,597 @@ +#!/usr/bin/env python3 """ -Health Checker - Optional feature to check channel URL health +Repository Health Monitor - Keeps the repository clean and organized """ +import os +import shutil import logging -from concurrent.futures import ThreadPoolExecutor, as_completed +import json +from datetime import datetime, timedelta +from pathlib import Path from typing import Dict, List, Tuple -from urllib.request import Request, urlopen -from urllib.error import URLError, HTTPError +import subprocess +import gzip -class HealthChecker: - """Check channel URL health status.""" +class RepoHealthMonitor: + """Monitor and maintain repository cleanliness and organization.""" - def __init__(self, config): + def __init__(self, config=None): self.config = config self.logger = logging.getLogger(__name__) - self.timeout = config.settings.get('health_check_timeout', 5) - - def check_channel_health(self, url: str) -> Tuple[bool, str]: - """Check if a channel URL is accessible.""" - try: - req = Request(url, headers={'User-Agent': 'IPTV-Checker/1.0'}) - - with urlopen(req, timeout=self.timeout) as response: - status_code = response.getcode() - if status_code == 200: - return True, f"OK ({status_code})" - else: - return False, f"HTTP {status_code}" - - except HTTPError as e: - return False, f"HTTP {e.code}" - except URLError as e: - return False, f"Connection error: {str(e.reason)}" - except Exception as e: - return False, f"Error: {str(e)}" - - def batch_health_check(self, channels: List[Dict]) -> Dict[str, Tuple[bool, str]]: - """Perform health checks on multiple channels concurrently.""" - if not self.config.settings.get('enable_health_check', False): - self.logger.info("Health check disabled in settings") - return {} + self.root_path = Path.cwd() - self.logger.info("Starting batch health check...") - results = {} - max_workers = self.config.settings.get('max_workers', 4) - - def check_single(channel): - url = channel.get('Stream URL', '') - name = channel.get('Stream name', 'Unknown') - is_healthy, status = self.check_channel_health(url) - return name, (is_healthy, status) - - # Limit to first 100 channels for performance - channels_to_check = channels[:100] - - with ThreadPoolExecutor(max_workers=max_workers) as executor: - future_to_channel = { - executor.submit(check_single, channel): channel - for channel in channels_to_check + # Define cleanup rules + self.cleanup_rules = { + 'temp_files': { + 'patterns': ['*_temp*', '*.tmp', '*~', '*.backup.*'], + 'max_age_days': 1, + 'action': 'delete' + }, + 'old_logs': { + 'patterns': ['*.log'], + 'max_age_days': 7, + 'action': 'archive', + 'keep_recent': 5 + }, + 'old_backups': { + 'patterns': ['backups/*.txt'], + 'max_age_days': 30, + 'action': 'compress' + }, + 'large_files': { + 'max_size_mb': 50, + 'action': 'warn' + }, + 'python_cache': { + 'patterns': ['__pycache__', '*.pyc', '*.pyo'], + 'action': 'delete' } + } + + def run_health_check(self) -> Dict: + """Run comprehensive repository health check.""" + self.logger.info("๐Ÿ” Starting repository health check...") + + health_report = { + 'timestamp': datetime.now().isoformat(), + 'repository_size': self._calculate_repo_size(), + 'file_counts': self._count_files_by_type(), + 'issues_found': [], + 'cleanup_suggestions': [], + 'space_analysis': self._analyze_disk_usage(), + 'organization_score': 0 + } + + # Check various aspects + health_report.update({ + 'temp_files': self._check_temp_files(), + 'log_files': self._check_log_files(), + 'backup_files': self._check_backup_files(), + 'large_files': self._check_large_files(), + 'python_artifacts': self._check_python_artifacts(), + 'git_status': self._check_git_status() + }) + + # Calculate organization score + health_report['organization_score'] = self._calculate_organization_score(health_report) + + # Generate suggestions + health_report['cleanup_suggestions'] = self._generate_cleanup_suggestions(health_report) + + self.logger.info(f"๐Ÿ“Š Health check complete. Organization score: {health_report['organization_score']}/100") + return health_report + + def auto_cleanup(self, dry_run: bool = False) -> Dict: + """Automatically clean up repository based on rules.""" + self.logger.info(f"๐Ÿงน Starting auto-cleanup (dry_run={dry_run})...") + + cleanup_results = { + 'files_deleted': [], + 'files_archived': [], + 'files_compressed': [], + 'space_freed_mb': 0, + 'errors': [] + } + + try: + # Clean temp files + cleanup_results.update(self._cleanup_temp_files(dry_run)) - completed = 0 - for future in as_completed(future_to_channel): + # Archive old logs + cleanup_results.update(self._archive_old_logs(dry_run)) + + # Compress old backups + cleanup_results.update(self._compress_old_backups(dry_run)) + + # Remove Python cache + cleanup_results.update(self._cleanup_python_cache(dry_run)) + + # Organize files + cleanup_results.update(self._organize_files(dry_run)) + + except Exception as e: + self.logger.error(f"Error during auto-cleanup: {e}") + cleanup_results['errors'].append(str(e)) + + self.logger.info(f"โœ… Auto-cleanup complete. Space freed: {cleanup_results['space_freed_mb']:.2f} MB") + return cleanup_results + + def _calculate_repo_size(self) -> Dict: + """Calculate repository size breakdown.""" + sizes = { + 'total_mb': 0, + 'by_directory': {}, + 'by_extension': {} + } + + for root, dirs, files in os.walk(self.root_path): + # Skip .git directory + if '.git' in root: + continue + + dir_size = 0 + for file in files: + file_path = Path(root) / file try: - name, result = future.result() - results[name] = result - completed += 1 + file_size = file_path.stat().st_size + dir_size += file_size - if completed % 10 == 0: - self.logger.info(f"Health check progress: {completed}/{len(channels_to_check)}") + # Track by extension + ext = file_path.suffix.lower() + if ext: + sizes['by_extension'][ext] = sizes['by_extension'].get(ext, 0) + file_size + except (OSError, FileNotFoundError): + continue + + if dir_size > 0: + rel_dir = str(Path(root).relative_to(self.root_path)) + sizes['by_directory'][rel_dir] = dir_size / (1024 * 1024) # Convert to MB + sizes['total_mb'] += dir_size / (1024 * 1024) + + return sizes + + def _count_files_by_type(self) -> Dict: + """Count files by type and directory.""" + counts = { + 'total_files': 0, + 'by_extension': {}, + 'by_directory': {} + } + + for root, dirs, files in os.walk(self.root_path): + if '.git' in root: + continue + + rel_dir = str(Path(root).relative_to(self.root_path)) + counts['by_directory'][rel_dir] = len(files) + counts['total_files'] += len(files) + + for file in files: + ext = Path(file).suffix.lower() + if ext: + counts['by_extension'][ext] = counts['by_extension'].get(ext, 0) + 1 + + return counts + + def _check_temp_files(self) -> Dict: + """Check for temporary files that should be cleaned.""" + temp_files = [] + + for pattern in self.cleanup_rules['temp_files']['patterns']: + for file_path in self.root_path.rglob(pattern): + if file_path.is_file() and '.git' not in str(file_path): + age_days = (datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)).days + temp_files.append({ + 'path': str(file_path.relative_to(self.root_path)), + 'size_mb': file_path.stat().st_size / (1024 * 1024), + 'age_days': age_days + }) + + return { + 'count': len(temp_files), + 'files': temp_files, + 'total_size_mb': sum(f['size_mb'] for f in temp_files) + } + + def _check_log_files(self) -> Dict: + """Check log file status and organization.""" + log_files = [] + reports_dir = self.root_path / 'reports' + + # Check root log files + for log_file in self.root_path.glob('*.log'): + age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days + log_files.append({ + 'path': str(log_file.relative_to(self.root_path)), + 'size_mb': log_file.stat().st_size / (1024 * 1024), + 'age_days': age_days, + 'location': 'root', + 'should_move': True + }) + + # Check reports directory + if reports_dir.exists(): + for log_file in reports_dir.rglob('*.log'): + age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days + log_files.append({ + 'path': str(log_file.relative_to(self.root_path)), + 'size_mb': log_file.stat().st_size / (1024 * 1024), + 'age_days': age_days, + 'location': 'reports', + 'should_move': False + }) + + return { + 'count': len(log_files), + 'files': log_files, + 'misplaced_count': sum(1 for f in log_files if f['should_move']), + 'total_size_mb': sum(f['size_mb'] for f in log_files) + } + + def _check_backup_files(self) -> Dict: + """Check backup file organization and compression opportunities.""" + backups = [] + backup_dir = self.root_path / 'backups' + + if backup_dir.exists(): + for backup_file in backup_dir.rglob('*'): + if backup_file.is_file(): + age_days = (datetime.now() - datetime.fromtimestamp(backup_file.stat().st_mtime)).days + is_compressed = backup_file.suffix in ['.gz', '.zip', '.tar.gz'] + + backups.append({ + 'path': str(backup_file.relative_to(self.root_path)), + 'size_mb': backup_file.stat().st_size / (1024 * 1024), + 'age_days': age_days, + 'is_compressed': is_compressed, + 'should_compress': age_days > 7 and not is_compressed + }) + + return { + 'count': len(backups), + 'files': backups, + 'compression_candidates': sum(1 for b in backups if b['should_compress']), + 'total_size_mb': sum(b['size_mb'] for b in backups) + } + + def _check_large_files(self) -> Dict: + """Check for unusually large files.""" + large_files = [] + max_size_bytes = self.cleanup_rules['large_files']['max_size_mb'] * 1024 * 1024 + + for root, dirs, files in os.walk(self.root_path): + if '.git' in root: + continue + + for file in files: + file_path = Path(root) / file + try: + if file_path.stat().st_size > max_size_bytes: + large_files.append({ + 'path': str(file_path.relative_to(self.root_path)), + 'size_mb': file_path.stat().st_size / (1024 * 1024), + 'type': file_path.suffix.lower() + }) + except (OSError, FileNotFoundError): + continue + + return { + 'count': len(large_files), + 'files': large_files, + 'total_size_mb': sum(f['size_mb'] for f in large_files) + } + + def _check_python_artifacts(self) -> Dict: + """Check for Python cache and compiled files.""" + artifacts = [] + + # Find __pycache__ directories + for pycache_dir in self.root_path.rglob('__pycache__'): + if pycache_dir.is_dir(): + size = sum(f.stat().st_size for f in pycache_dir.rglob('*') if f.is_file()) + artifacts.append({ + 'path': str(pycache_dir.relative_to(self.root_path)), + 'type': 'directory', + 'size_mb': size / (1024 * 1024) + }) + + # Find .pyc and .pyo files + for pyc_file in self.root_path.rglob('*.py[co]'): + artifacts.append({ + 'path': str(pyc_file.relative_to(self.root_path)), + 'type': 'file', + 'size_mb': pyc_file.stat().st_size / (1024 * 1024) + }) + + return { + 'count': len(artifacts), + 'files': artifacts, + 'total_size_mb': sum(a['size_mb'] for a in artifacts) + } + + def _check_git_status(self) -> Dict: + """Check git repository status.""" + try: + # Check for untracked files + result = subprocess.run(['git', 'status', '--porcelain'], + capture_output=True, text=True, cwd=self.root_path) + + untracked = [] + modified = [] + + for line in result.stdout.strip().split('\n'): + if line: + status, filename = line[:2], line[3:] + if status.strip() == '??': + untracked.append(filename) + elif status.strip(): + modified.append(filename) + + return { + 'untracked_files': untracked, + 'modified_files': modified, + 'is_clean': len(untracked) == 0 and len(modified) == 0 + } + except subprocess.CalledProcessError: + return {'error': 'Not a git repository or git not available'} + + def _calculate_organization_score(self, health_report: Dict) -> int: + """Calculate a repository organization score (0-100).""" + score = 100 + + # Deduct points for issues + if health_report['temp_files']['count'] > 0: + score -= min(20, health_report['temp_files']['count'] * 2) + + if health_report['log_files']['misplaced_count'] > 0: + score -= min(15, health_report['log_files']['misplaced_count'] * 5) + + if health_report['backup_files']['compression_candidates'] > 0: + score -= min(10, health_report['backup_files']['compression_candidates'] * 3) + + if health_report['python_artifacts']['count'] > 0: + score -= min(10, health_report['python_artifacts']['count']) + + if health_report['large_files']['count'] > 0: + score -= min(15, health_report['large_files']['count'] * 5) + + # Check git status + git_status = health_report.get('git_status', {}) + if not git_status.get('is_clean', True): + score -= 10 + + return max(0, score) + + def _generate_cleanup_suggestions(self, health_report: Dict) -> List[str]: + """Generate specific cleanup suggestions based on health report.""" + suggestions = [] + + if health_report['temp_files']['count'] > 0: + suggestions.append(f"๐Ÿ—‘๏ธ Remove {health_report['temp_files']['count']} temporary files ({health_report['temp_files']['total_size_mb']:.1f} MB)") + + if health_report['log_files']['misplaced_count'] > 0: + suggestions.append(f"๐Ÿ“ Move {health_report['log_files']['misplaced_count']} log files to reports/ directory") + + if health_report['backup_files']['compression_candidates'] > 0: + suggestions.append(f"๐Ÿ—œ๏ธ Compress {health_report['backup_files']['compression_candidates']} old backup files") + + if health_report['python_artifacts']['count'] > 0: + suggestions.append(f"๐Ÿ Remove Python cache artifacts ({health_report['python_artifacts']['total_size_mb']:.1f} MB)") + + if health_report['large_files']['count'] > 0: + suggestions.append(f"๐Ÿ“ Review {health_report['large_files']['count']} large files for archival") + + git_status = health_report.get('git_status', {}) + if git_status.get('untracked_files'): + suggestions.append(f"๐Ÿ“ Add {len(git_status['untracked_files'])} untracked files to .gitignore or commit them") + + return suggestions + + def _analyze_disk_usage(self) -> Dict: + """Analyze disk usage patterns.""" + try: + total, used, free = shutil.disk_usage(self.root_path) + return { + 'total_gb': total / (1024**3), + 'used_gb': used / (1024**3), + 'free_gb': free / (1024**3), + 'usage_percent': (used / total) * 100 + } + except Exception as e: + return {'error': str(e)} + + def _cleanup_temp_files(self, dry_run: bool) -> Dict: + """Clean up temporary files.""" + results = {'temp_files_deleted': []} + + for pattern in self.cleanup_rules['temp_files']['patterns']: + for file_path in self.root_path.rglob(pattern): + if file_path.is_file() and '.git' not in str(file_path): + if not dry_run: + try: + file_path.unlink() + results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path))) + except Exception as e: + self.logger.warning(f"Could not delete {file_path}: {e}") + else: + results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path))) + + return results + + def _archive_old_logs(self, dry_run: bool) -> Dict: + """Archive old log files.""" + results = {'logs_archived': []} + + # Create reports/logs directory if it doesn't exist + logs_dir = self.root_path / 'reports' / 'logs' + if not dry_run: + logs_dir.mkdir(parents=True, exist_ok=True) + + # Move log files from root to reports/logs + for log_file in self.root_path.glob('*.log'): + new_path = logs_dir / log_file.name + if not dry_run: + try: + shutil.move(str(log_file), str(new_path)) + results['logs_archived'].append(str(log_file.relative_to(self.root_path))) except Exception as e: - self.logger.warning(f"Health check failed: {e}") + self.logger.warning(f"Could not move {log_file}: {e}") + else: + results['logs_archived'].append(str(log_file.relative_to(self.root_path))) - healthy_count = sum(1 for is_healthy, _ in results.values() if is_healthy) - total_checked = len(results) - success_rate = (healthy_count / total_checked * 100) if total_checked > 0 else 0 + return results + + def _compress_old_backups(self, dry_run: bool) -> Dict: + """Compress old backup files.""" + results = {'backups_compressed': []} + backup_dir = self.root_path / 'backups' - self.logger.info(f"Health check complete: {healthy_count}/{total_checked} channels healthy ({success_rate:.1f}%)") + if backup_dir.exists(): + cutoff_date = datetime.now() - timedelta(days=7) + + for backup_file in backup_dir.glob('*.txt'): + file_date = datetime.fromtimestamp(backup_file.stat().st_mtime) + if file_date < cutoff_date: + if not dry_run: + try: + # Compress with gzip + with open(backup_file, 'rb') as f_in: + with gzip.open(f"{backup_file}.gz", 'wb') as f_out: + shutil.copyfileobj(f_in, f_out) + backup_file.unlink() + results['backups_compressed'].append(str(backup_file.relative_to(self.root_path))) + except Exception as e: + self.logger.warning(f"Could not compress {backup_file}: {e}") + else: + results['backups_compressed'].append(str(backup_file.relative_to(self.root_path))) - return results \ No newline at end of file + return results + + def _cleanup_python_cache(self, dry_run: bool) -> Dict: + """Remove Python cache files and directories.""" + results = {'python_cache_removed': []} + + # Remove __pycache__ directories + for pycache_dir in self.root_path.rglob('__pycache__'): + if pycache_dir.is_dir(): + if not dry_run: + try: + shutil.rmtree(pycache_dir) + results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path))) + except Exception as e: + self.logger.warning(f"Could not remove {pycache_dir}: {e}") + else: + results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path))) + + # Remove .pyc and .pyo files + for pyc_file in self.root_path.rglob('*.py[co]'): + if not dry_run: + try: + pyc_file.unlink() + results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path))) + except Exception as e: + self.logger.warning(f"Could not remove {pyc_file}: {e}") + else: + results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path))) + + return results + + def _organize_files(self, dry_run: bool) -> Dict: + """Organize files into proper directories.""" + results = {'files_organized': []} + + # Create proper directory structure + directories = [ + 'reports/logs', + 'reports/archive', + 'backups/compressed', + 'templates' + ] + + if not dry_run: + for directory in directories: + (self.root_path / directory).mkdir(parents=True, exist_ok=True) + + return results + + def save_health_report(self, health_report: Dict, filename: str = None) -> Path: + """Save health report to file.""" + if filename is None: + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + filename = f'repo_health_{timestamp}.json' + + reports_dir = self.root_path / 'reports' + reports_dir.mkdir(exist_ok=True) + + report_path = reports_dir / filename + + try: + with open(report_path, 'w', encoding='utf-8') as f: + json.dump(health_report, f, indent=2, default=str) + + self.logger.info(f"Health report saved to: {report_path}") + return report_path + except Exception as e: + self.logger.error(f"Could not save health report: {e}") + return None + + +def main(): + """Command line interface for repository health monitoring.""" + import argparse + + parser = argparse.ArgumentParser(description='IPTV Repository Health Monitor') + parser.add_argument('--check', action='store_true', help='Run health check') + parser.add_argument('--cleanup', action='store_true', help='Run auto cleanup') + parser.add_argument('--dry-run', action='store_true', help='Dry run (no actual changes)') + parser.add_argument('--save-report', action='store_true', help='Save health report to file') + + args = parser.parse_args() + + # Setup logging + logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s' + ) + + monitor = RepoHealthMonitor() + + if args.check or args.save_report: + health_report = monitor.run_health_check() + + print(f"\n๐Ÿ“Š Repository Health Report") + print(f"Organization Score: {health_report['organization_score']}/100") + print(f"Total Size: {health_report['repository_size']['total_mb']:.1f} MB") + print(f"Total Files: {health_report['file_counts']['total_files']}") + + if health_report['cleanup_suggestions']: + print("\n๐Ÿ”ง Cleanup Suggestions:") + for suggestion in health_report['cleanup_suggestions']: + print(f" {suggestion}") + else: + print("\nโœ… Repository is well organized!") + + if args.save_report: + monitor.save_health_report(health_report) + + if args.cleanup: + cleanup_results = monitor.auto_cleanup(dry_run=args.dry_run) + + if args.dry_run: + print("\n๐Ÿงช Dry Run Results:") + else: + print("\n๐Ÿงน Cleanup Results:") + + for key, items in cleanup_results.items(): + if isinstance(items, list) and items: + print(f" {key}: {len(items)} items") + for item in items[:5]: # Show first 5 + print(f" - {item}") + if len(items) > 5: + print(f" ... and {len(items) - 5} more") + + +if __name__ == "__main__": + main() \ No newline at end of file