#!/usr/bin/env python3 """ Repository Health Monitor - Keeps the repository clean and organized """ import os import shutil import logging import json from datetime import datetime, timedelta from pathlib import Path from typing import Dict, List, Tuple import subprocess import gzip class RepoHealthMonitor: """Monitor and maintain repository cleanliness and organization.""" def __init__(self, config=None): self.config = config self.logger = logging.getLogger(__name__) self.root_path = Path.cwd() # Define cleanup rules self.cleanup_rules = { 'temp_files': { 'patterns': ['*_temp*', '*.tmp', '*~', '*.backup.*'], 'max_age_days': 1, 'action': 'delete' }, 'old_logs': { 'patterns': ['*.log'], 'max_age_days': 7, 'action': 'archive', 'keep_recent': 5 }, 'old_backups': { 'patterns': ['backups/*.txt'], 'max_age_days': 30, 'action': 'compress' }, 'large_files': { 'max_size_mb': 50, 'action': 'warn' }, 'python_cache': { 'patterns': ['__pycache__', '*.pyc', '*.pyo'], 'action': 'delete' } } def run_health_check(self) -> Dict: """Run comprehensive repository health check.""" self.logger.info("๐Ÿ” Starting repository health check...") health_report = { 'timestamp': datetime.now().isoformat(), 'repository_size': self._calculate_repo_size(), 'file_counts': self._count_files_by_type(), 'issues_found': [], 'cleanup_suggestions': [], 'space_analysis': self._analyze_disk_usage(), 'organization_score': 0 } # Check various aspects health_report.update({ 'temp_files': self._check_temp_files(), 'log_files': self._check_log_files(), 'backup_files': self._check_backup_files(), 'large_files': self._check_large_files(), 'python_artifacts': self._check_python_artifacts(), 'git_status': self._check_git_status() }) # Calculate organization score health_report['organization_score'] = self._calculate_organization_score(health_report) # Generate suggestions health_report['cleanup_suggestions'] = self._generate_cleanup_suggestions(health_report) self.logger.info(f"๐Ÿ“Š Health check complete. Organization score: {health_report['organization_score']}/100") return health_report def auto_cleanup(self, dry_run: bool = False) -> Dict: """Automatically clean up repository based on rules.""" self.logger.info(f"๐Ÿงน Starting auto-cleanup (dry_run={dry_run})...") cleanup_results = { 'files_deleted': [], 'files_archived': [], 'files_compressed': [], 'space_freed_mb': 0, 'errors': [] } try: # Clean temp files cleanup_results.update(self._cleanup_temp_files(dry_run)) # Archive old logs cleanup_results.update(self._archive_old_logs(dry_run)) # Compress old backups cleanup_results.update(self._compress_old_backups(dry_run)) # Remove Python cache cleanup_results.update(self._cleanup_python_cache(dry_run)) # Organize files cleanup_results.update(self._organize_files(dry_run)) except Exception as e: self.logger.error(f"Error during auto-cleanup: {e}") cleanup_results['errors'].append(str(e)) self.logger.info(f"โœ… Auto-cleanup complete. Space freed: {cleanup_results['space_freed_mb']:.2f} MB") return cleanup_results def _calculate_repo_size(self) -> Dict: """Calculate repository size breakdown.""" sizes = { 'total_mb': 0, 'by_directory': {}, 'by_extension': {} } for root, dirs, files in os.walk(self.root_path): # Skip .git directory if '.git' in root: continue dir_size = 0 for file in files: file_path = Path(root) / file try: file_size = file_path.stat().st_size dir_size += file_size # Track by extension ext = file_path.suffix.lower() if ext: sizes['by_extension'][ext] = sizes['by_extension'].get(ext, 0) + file_size except (OSError, FileNotFoundError): continue if dir_size > 0: rel_dir = str(Path(root).relative_to(self.root_path)) sizes['by_directory'][rel_dir] = dir_size / (1024 * 1024) # Convert to MB sizes['total_mb'] += dir_size / (1024 * 1024) return sizes def _count_files_by_type(self) -> Dict: """Count files by type and directory.""" counts = { 'total_files': 0, 'by_extension': {}, 'by_directory': {} } for root, dirs, files in os.walk(self.root_path): if '.git' in root: continue rel_dir = str(Path(root).relative_to(self.root_path)) counts['by_directory'][rel_dir] = len(files) counts['total_files'] += len(files) for file in files: ext = Path(file).suffix.lower() if ext: counts['by_extension'][ext] = counts['by_extension'].get(ext, 0) + 1 return counts def _check_temp_files(self) -> Dict: """Check for temporary files that should be cleaned.""" temp_files = [] for pattern in self.cleanup_rules['temp_files']['patterns']: for file_path in self.root_path.rglob(pattern): if file_path.is_file() and '.git' not in str(file_path): age_days = (datetime.now() - datetime.fromtimestamp(file_path.stat().st_mtime)).days temp_files.append({ 'path': str(file_path.relative_to(self.root_path)), 'size_mb': file_path.stat().st_size / (1024 * 1024), 'age_days': age_days }) return { 'count': len(temp_files), 'files': temp_files, 'total_size_mb': sum(f['size_mb'] for f in temp_files) } def _check_log_files(self) -> Dict: """Check log file status and organization.""" log_files = [] reports_dir = self.root_path / 'reports' # Check root log files for log_file in self.root_path.glob('*.log'): age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days log_files.append({ 'path': str(log_file.relative_to(self.root_path)), 'size_mb': log_file.stat().st_size / (1024 * 1024), 'age_days': age_days, 'location': 'root', 'should_move': True }) # Check reports directory if reports_dir.exists(): for log_file in reports_dir.rglob('*.log'): age_days = (datetime.now() - datetime.fromtimestamp(log_file.stat().st_mtime)).days log_files.append({ 'path': str(log_file.relative_to(self.root_path)), 'size_mb': log_file.stat().st_size / (1024 * 1024), 'age_days': age_days, 'location': 'reports', 'should_move': False }) return { 'count': len(log_files), 'files': log_files, 'misplaced_count': sum(1 for f in log_files if f['should_move']), 'total_size_mb': sum(f['size_mb'] for f in log_files) } def _check_backup_files(self) -> Dict: """Check backup file organization and compression opportunities.""" backups = [] backup_dir = self.root_path / 'backups' if backup_dir.exists(): for backup_file in backup_dir.rglob('*'): if backup_file.is_file(): age_days = (datetime.now() - datetime.fromtimestamp(backup_file.stat().st_mtime)).days is_compressed = backup_file.suffix in ['.gz', '.zip', '.tar.gz'] backups.append({ 'path': str(backup_file.relative_to(self.root_path)), 'size_mb': backup_file.stat().st_size / (1024 * 1024), 'age_days': age_days, 'is_compressed': is_compressed, 'should_compress': age_days > 7 and not is_compressed }) return { 'count': len(backups), 'files': backups, 'compression_candidates': sum(1 for b in backups if b['should_compress']), 'total_size_mb': sum(b['size_mb'] for b in backups) } def _check_large_files(self) -> Dict: """Check for unusually large files.""" large_files = [] max_size_bytes = self.cleanup_rules['large_files']['max_size_mb'] * 1024 * 1024 for root, dirs, files in os.walk(self.root_path): if '.git' in root: continue for file in files: file_path = Path(root) / file try: if file_path.stat().st_size > max_size_bytes: large_files.append({ 'path': str(file_path.relative_to(self.root_path)), 'size_mb': file_path.stat().st_size / (1024 * 1024), 'type': file_path.suffix.lower() }) except (OSError, FileNotFoundError): continue return { 'count': len(large_files), 'files': large_files, 'total_size_mb': sum(f['size_mb'] for f in large_files) } def _check_python_artifacts(self) -> Dict: """Check for Python cache and compiled files.""" artifacts = [] # Find __pycache__ directories for pycache_dir in self.root_path.rglob('__pycache__'): if pycache_dir.is_dir(): size = sum(f.stat().st_size for f in pycache_dir.rglob('*') if f.is_file()) artifacts.append({ 'path': str(pycache_dir.relative_to(self.root_path)), 'type': 'directory', 'size_mb': size / (1024 * 1024) }) # Find .pyc and .pyo files for pyc_file in self.root_path.rglob('*.py[co]'): artifacts.append({ 'path': str(pyc_file.relative_to(self.root_path)), 'type': 'file', 'size_mb': pyc_file.stat().st_size / (1024 * 1024) }) return { 'count': len(artifacts), 'files': artifacts, 'total_size_mb': sum(a['size_mb'] for a in artifacts) } def _check_git_status(self) -> Dict: """Check git repository status.""" try: # Check for untracked files result = subprocess.run(['git', 'status', '--porcelain'], capture_output=True, text=True, cwd=self.root_path) untracked = [] modified = [] for line in result.stdout.strip().split('\n'): if line: status, filename = line[:2], line[3:] if status.strip() == '??': untracked.append(filename) elif status.strip(): modified.append(filename) return { 'untracked_files': untracked, 'modified_files': modified, 'is_clean': len(untracked) == 0 and len(modified) == 0 } except subprocess.CalledProcessError: return {'error': 'Not a git repository or git not available'} def _calculate_organization_score(self, health_report: Dict) -> int: """Calculate a repository organization score (0-100).""" score = 100 # Deduct points for issues if health_report['temp_files']['count'] > 0: score -= min(20, health_report['temp_files']['count'] * 2) if health_report['log_files']['misplaced_count'] > 0: score -= min(15, health_report['log_files']['misplaced_count'] * 5) if health_report['backup_files']['compression_candidates'] > 0: score -= min(10, health_report['backup_files']['compression_candidates'] * 3) if health_report['python_artifacts']['count'] > 0: score -= min(10, health_report['python_artifacts']['count']) if health_report['large_files']['count'] > 0: score -= min(15, health_report['large_files']['count'] * 5) # Check git status git_status = health_report.get('git_status', {}) if not git_status.get('is_clean', True): score -= 10 return max(0, score) def _generate_cleanup_suggestions(self, health_report: Dict) -> List[str]: """Generate specific cleanup suggestions based on health report.""" suggestions = [] if health_report['temp_files']['count'] > 0: suggestions.append(f"๐Ÿ—‘๏ธ Remove {health_report['temp_files']['count']} temporary files ({health_report['temp_files']['total_size_mb']:.1f} MB)") if health_report['log_files']['misplaced_count'] > 0: suggestions.append(f"๐Ÿ“ Move {health_report['log_files']['misplaced_count']} log files to reports/ directory") if health_report['backup_files']['compression_candidates'] > 0: suggestions.append(f"๐Ÿ—œ๏ธ Compress {health_report['backup_files']['compression_candidates']} old backup files") if health_report['python_artifacts']['count'] > 0: suggestions.append(f"๐Ÿ Remove Python cache artifacts ({health_report['python_artifacts']['total_size_mb']:.1f} MB)") if health_report['large_files']['count'] > 0: suggestions.append(f"๐Ÿ“ Review {health_report['large_files']['count']} large files for archival") git_status = health_report.get('git_status', {}) if git_status.get('untracked_files'): suggestions.append(f"๐Ÿ“ Add {len(git_status['untracked_files'])} untracked files to .gitignore or commit them") return suggestions def _analyze_disk_usage(self) -> Dict: """Analyze disk usage patterns.""" try: total, used, free = shutil.disk_usage(self.root_path) return { 'total_gb': total / (1024**3), 'used_gb': used / (1024**3), 'free_gb': free / (1024**3), 'usage_percent': (used / total) * 100 } except Exception as e: return {'error': str(e)} def _cleanup_temp_files(self, dry_run: bool) -> Dict: """Clean up temporary files.""" results = {'temp_files_deleted': []} for pattern in self.cleanup_rules['temp_files']['patterns']: for file_path in self.root_path.rglob(pattern): if file_path.is_file() and '.git' not in str(file_path): if not dry_run: try: file_path.unlink() results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path))) except Exception as e: self.logger.warning(f"Could not delete {file_path}: {e}") else: results['temp_files_deleted'].append(str(file_path.relative_to(self.root_path))) return results def _archive_old_logs(self, dry_run: bool) -> Dict: """Archive old log files.""" results = {'logs_archived': []} # Create reports/logs directory if it doesn't exist logs_dir = self.root_path / 'reports' / 'logs' if not dry_run: logs_dir.mkdir(parents=True, exist_ok=True) # Move log files from root to reports/logs for log_file in self.root_path.glob('*.log'): new_path = logs_dir / log_file.name if not dry_run: try: shutil.move(str(log_file), str(new_path)) results['logs_archived'].append(str(log_file.relative_to(self.root_path))) except Exception as e: self.logger.warning(f"Could not move {log_file}: {e}") else: results['logs_archived'].append(str(log_file.relative_to(self.root_path))) return results def _compress_old_backups(self, dry_run: bool) -> Dict: """Compress old backup files.""" results = {'backups_compressed': []} backup_dir = self.root_path / 'backups' if backup_dir.exists(): cutoff_date = datetime.now() - timedelta(days=7) for backup_file in backup_dir.glob('*.txt'): file_date = datetime.fromtimestamp(backup_file.stat().st_mtime) if file_date < cutoff_date: if not dry_run: try: # Compress with gzip with open(backup_file, 'rb') as f_in: with gzip.open(f"{backup_file}.gz", 'wb') as f_out: shutil.copyfileobj(f_in, f_out) backup_file.unlink() results['backups_compressed'].append(str(backup_file.relative_to(self.root_path))) except Exception as e: self.logger.warning(f"Could not compress {backup_file}: {e}") else: results['backups_compressed'].append(str(backup_file.relative_to(self.root_path))) return results def _cleanup_python_cache(self, dry_run: bool) -> Dict: """Remove Python cache files and directories.""" results = {'python_cache_removed': []} # Remove __pycache__ directories for pycache_dir in self.root_path.rglob('__pycache__'): if pycache_dir.is_dir(): if not dry_run: try: shutil.rmtree(pycache_dir) results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path))) except Exception as e: self.logger.warning(f"Could not remove {pycache_dir}: {e}") else: results['python_cache_removed'].append(str(pycache_dir.relative_to(self.root_path))) # Remove .pyc and .pyo files for pyc_file in self.root_path.rglob('*.py[co]'): if not dry_run: try: pyc_file.unlink() results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path))) except Exception as e: self.logger.warning(f"Could not remove {pyc_file}: {e}") else: results['python_cache_removed'].append(str(pyc_file.relative_to(self.root_path))) return results def _organize_files(self, dry_run: bool) -> Dict: """Organize files into proper directories.""" results = {'files_organized': []} # Create proper directory structure directories = [ 'reports/logs', 'reports/archive', 'backups/compressed', 'templates' ] if not dry_run: for directory in directories: (self.root_path / directory).mkdir(parents=True, exist_ok=True) return results def save_health_report(self, health_report: Dict, filename: str = None) -> Path: """Save health report to file.""" if filename is None: timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') filename = f'repo_health_{timestamp}.json' reports_dir = self.root_path / 'reports' reports_dir.mkdir(exist_ok=True) report_path = reports_dir / filename try: with open(report_path, 'w', encoding='utf-8') as f: json.dump(health_report, f, indent=2, default=str) self.logger.info(f"Health report saved to: {report_path}") return report_path except Exception as e: self.logger.error(f"Could not save health report: {e}") return None def main(): """Command line interface for repository health monitoring.""" import argparse parser = argparse.ArgumentParser(description='IPTV Repository Health Monitor') parser.add_argument('--check', action='store_true', help='Run health check') parser.add_argument('--cleanup', action='store_true', help='Run auto cleanup') parser.add_argument('--dry-run', action='store_true', help='Dry run (no actual changes)') parser.add_argument('--save-report', action='store_true', help='Save health report to file') args = parser.parse_args() # Setup logging logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s' ) monitor = RepoHealthMonitor() if args.check or args.save_report: health_report = monitor.run_health_check() print(f"\n๐Ÿ“Š Repository Health Report") print(f"Organization Score: {health_report['organization_score']}/100") print(f"Total Size: {health_report['repository_size']['total_mb']:.1f} MB") print(f"Total Files: {health_report['file_counts']['total_files']}") if health_report['cleanup_suggestions']: print("\n๐Ÿ”ง Cleanup Suggestions:") for suggestion in health_report['cleanup_suggestions']: print(f" {suggestion}") else: print("\nโœ… Repository is well organized!") if args.save_report: monitor.save_health_report(health_report) if args.cleanup: cleanup_results = monitor.auto_cleanup(dry_run=args.dry_run) if args.dry_run: print("\n๐Ÿงช Dry Run Results:") else: print("\n๐Ÿงน Cleanup Results:") for key, items in cleanup_results.items(): if isinstance(items, list) and items: print(f" {key}: {len(items)} items") for item in items[:5]: # Show first 5 print(f" - {item}") if len(items) > 5: print(f" ... and {len(items) - 5} more") if __name__ == "__main__": main()