260 lines
No EOL
11 KiB
Python
260 lines
No EOL
11 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Web Interface: Comprehensive IPTV Repository Cleanup
|
|
This script will be run automatically by the workflow
|
|
"""
|
|
|
|
import os
|
|
import shutil
|
|
import gzip
|
|
from pathlib import Path
|
|
from datetime import datetime, timedelta
|
|
|
|
def cleanup_repository():
|
|
"""Main cleanup function for web interface."""
|
|
print("🎯 IPTV Repository Comprehensive Cleanup")
|
|
print("=" * 50)
|
|
|
|
root_path = Path.cwd()
|
|
cleaned = []
|
|
|
|
print(f"📁 Working in: {root_path}")
|
|
print("🚀 Starting cleanup...")
|
|
|
|
try:
|
|
# 1. Create proper directory structure
|
|
print("📁 Creating directory structure...")
|
|
directories = [
|
|
'data/archive',
|
|
'reports/logs', 'reports/daily', 'reports/archive',
|
|
'backups/archive',
|
|
'templates'
|
|
]
|
|
|
|
for directory in directories:
|
|
(root_path / directory).mkdir(parents=True, exist_ok=True)
|
|
cleaned.append(f"Created: {directory}/")
|
|
|
|
# 2. Clean up backups folder
|
|
print("💾 Cleaning backups folder...")
|
|
backups_dir = root_path / 'backups'
|
|
|
|
if backups_dir.exists():
|
|
backup_files = sorted(backups_dir.glob('channels_*.txt'),
|
|
key=lambda x: x.stat().st_mtime, reverse=True)
|
|
|
|
if len(backup_files) > 3:
|
|
# Keep 3 most recent, compress the rest
|
|
for old_backup in backup_files[3:]:
|
|
try:
|
|
archive_dir = backups_dir / 'archive'
|
|
archive_dir.mkdir(exist_ok=True)
|
|
compressed_path = archive_dir / f"{old_backup.stem}.gz"
|
|
with open(old_backup, 'rb') as f_in:
|
|
with gzip.open(compressed_path, 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
old_backup.unlink()
|
|
cleaned.append(f"Compressed & archived: {old_backup.name}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# Compress remaining backups
|
|
for backup_file in backups_dir.glob('channels_*.txt'):
|
|
try:
|
|
compressed_path = backup_file.with_suffix('.txt.gz')
|
|
if not compressed_path.exists():
|
|
with open(backup_file, 'rb') as f_in:
|
|
with gzip.open(compressed_path, 'wb') as f_out:
|
|
shutil.copyfileobj(f_in, f_out)
|
|
backup_file.unlink()
|
|
cleaned.append(f"Compressed: {backup_file.name}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# 3. Organize reports
|
|
print("📋 Organizing reports...")
|
|
reports_dir = root_path / 'reports'
|
|
|
|
# Move scattered report files to daily/
|
|
for report_file in reports_dir.glob('playlist_report_*.md'):
|
|
try:
|
|
daily_dir = reports_dir / 'daily'
|
|
daily_dir.mkdir(exist_ok=True)
|
|
new_path = daily_dir / report_file.name
|
|
shutil.move(str(report_file), str(new_path))
|
|
cleaned.append(f"Moved: {report_file.name} → reports/daily/")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# Archive old reports (older than 7 days)
|
|
cutoff_date = datetime.now() - timedelta(days=7)
|
|
daily_dir = reports_dir / 'daily'
|
|
|
|
if daily_dir.exists():
|
|
for report_file in daily_dir.glob('*.md'):
|
|
try:
|
|
file_date = datetime.fromtimestamp(report_file.stat().st_mtime)
|
|
if file_date < cutoff_date:
|
|
month_folder = reports_dir / 'archive' / file_date.strftime('%Y-%m')
|
|
month_folder.mkdir(parents=True, exist_ok=True)
|
|
new_path = month_folder / report_file.name
|
|
shutil.move(str(report_file), str(new_path))
|
|
cleaned.append(f"Archived: {report_file.name}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# 4. Remove Python cache completely
|
|
print("🐍 Removing Python cache...")
|
|
for cache_dir in root_path.rglob('__pycache__'):
|
|
if cache_dir.is_dir():
|
|
try:
|
|
shutil.rmtree(cache_dir)
|
|
cleaned.append(f"Removed: {cache_dir.relative_to(root_path)}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
for pyc_file in list(root_path.rglob('*.pyc')) + list(root_path.rglob('*.pyo')):
|
|
try:
|
|
pyc_file.unlink()
|
|
cleaned.append(f"Removed: {pyc_file.relative_to(root_path)}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# 5. Clean scripts folder
|
|
print("🔧 Cleaning scripts folder...")
|
|
scripts_dir = root_path / 'scripts'
|
|
|
|
# Remove scripts/config if it exists and move files to main config
|
|
scripts_config = scripts_dir / 'config'
|
|
if scripts_config.exists():
|
|
try:
|
|
main_config = root_path / 'config'
|
|
main_config.mkdir(exist_ok=True)
|
|
for config_file in scripts_config.rglob('*'):
|
|
if config_file.is_file():
|
|
new_path = main_config / config_file.name
|
|
if not new_path.exists():
|
|
shutil.move(str(config_file), str(new_path))
|
|
cleaned.append(f"Moved: {config_file.name} from scripts/config/")
|
|
shutil.rmtree(scripts_config)
|
|
cleaned.append("Removed: scripts/config/ directory")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# Ensure __init__.py exists
|
|
init_file = scripts_dir / '__init__.py'
|
|
if not init_file.exists():
|
|
with open(init_file, 'w') as f:
|
|
f.write('# IPTV Scripts Package\n')
|
|
cleaned.append("Created: scripts/__init__.py")
|
|
|
|
# 6. Clean root directory
|
|
print("🧹 Cleaning root directory...")
|
|
|
|
# Remove setup scripts from root
|
|
for setup_file in root_path.glob('setup_*.py'):
|
|
try:
|
|
setup_file.unlink()
|
|
cleaned.append(f"Removed: {setup_file.name}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# Move log files to proper location
|
|
logs_dir = reports_dir / 'logs'
|
|
logs_dir.mkdir(exist_ok=True)
|
|
|
|
for log_file in root_path.glob('*.log'):
|
|
try:
|
|
new_path = logs_dir / log_file.name
|
|
shutil.move(str(log_file), str(new_path))
|
|
cleaned.append(f"Moved: {log_file.name} → reports/logs/")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# Remove temporary files
|
|
patterns = ['*_temp*', '*.tmp', '*~', '*.swp', '*.swo']
|
|
for pattern in patterns:
|
|
for temp_file in root_path.glob(pattern):
|
|
if temp_file.is_file():
|
|
try:
|
|
temp_file.unlink()
|
|
cleaned.append(f"Removed: {temp_file.name}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# 7. Create data snapshot
|
|
print("📊 Creating data snapshot...")
|
|
channels_file = root_path / 'channels.txt'
|
|
if channels_file.exists():
|
|
try:
|
|
today = datetime.now()
|
|
data_dir = root_path / 'data' / today.strftime('%Y-%m')
|
|
data_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
snapshot_name = f"channels_{today.strftime('%Y%m%d')}.txt"
|
|
snapshot_path = data_dir / snapshot_name
|
|
|
|
if not snapshot_path.exists():
|
|
shutil.copy2(channels_file, snapshot_path)
|
|
cleaned.append(f"Created: data snapshot {snapshot_name}")
|
|
except Exception as e:
|
|
print(f" Warning: {e}")
|
|
|
|
# 8. Remove this cleanup script after running
|
|
cleanup_script = root_path / 'comprehensive_cleanup.py'
|
|
if cleanup_script.exists():
|
|
try:
|
|
cleanup_script.unlink()
|
|
cleaned.append("Removed: comprehensive_cleanup.py (cleanup complete)")
|
|
except Exception as e:
|
|
print(f" Warning: Could not remove cleanup script: {e}")
|
|
|
|
print(f"\n✅ Cleanup complete! Processed {len(cleaned)} items")
|
|
|
|
if cleaned:
|
|
print("\n🔧 Actions taken:")
|
|
for item in cleaned[:10]: # Show first 10
|
|
print(f" ✅ {item}")
|
|
if len(cleaned) > 10:
|
|
print(f" ... and {len(cleaned) - 10} more items")
|
|
|
|
# Repository status
|
|
print(f"\n📊 Repository status:")
|
|
try:
|
|
total_files = len(list(root_path.rglob('*')))
|
|
repo_size = sum(f.stat().st_size for f in root_path.rglob('*')
|
|
if f.is_file() and '.git' not in str(f))
|
|
repo_size_mb = repo_size / (1024 * 1024)
|
|
|
|
print(f" 📁 Total files: {total_files}")
|
|
print(f" 💾 Repository size: {repo_size_mb:.1f} MB")
|
|
|
|
# Check cleanliness
|
|
cache_dirs = len(list(root_path.rglob('__pycache__')))
|
|
temp_files = len(list(root_path.rglob('*.tmp')))
|
|
log_files_root = len(list(root_path.glob('*.log')))
|
|
|
|
print(f" 🧹 Cache directories: {cache_dirs}")
|
|
print(f" 🗑️ Temp files: {temp_files}")
|
|
print(f" 📋 Root log files: {log_files_root}")
|
|
|
|
if cache_dirs == 0 and temp_files == 0 and log_files_root == 0:
|
|
print(" ✅ Repository is now clean!")
|
|
else:
|
|
print(" 🟡 Some cleanup items remain")
|
|
|
|
except Exception as e:
|
|
print(f" Could not calculate stats: {e}")
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(f"❌ Error during cleanup: {e}")
|
|
return False
|
|
|
|
if __name__ == "__main__":
|
|
success = cleanup_repository()
|
|
if success:
|
|
print("\n🎉 Repository cleanup successful!")
|
|
else:
|
|
print("\n⚠️ Repository cleanup completed with warnings") |