#!/usr/bin/env python3 """ IPTV Playlist Generator - Main Script Modular design for better maintainability and easier development. """ import logging import os from datetime import datetime # Import our modular components from scripts.config_manager import ConfigManager from scripts.channel_processor import ChannelProcessor from scripts.file_manager import FileManager from scripts.playlist_builder import PlaylistBuilder from scripts.health_checker import HealthChecker from scripts.report_generator import ReportGenerator def setup_logging(): """Setup comprehensive logging.""" logging.basicConfig( level=logging.INFO, format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler('playlist_update.log', encoding='utf-8'), logging.StreamHandler() ] ) def generate_playlist(): """Main playlist generation function - coordinates all modules.""" try: setup_logging() logging.info("Starting modular playlist generation...") # Initialize all modules config = ConfigManager() file_manager = FileManager(config) processor = ChannelProcessor(config) builder = PlaylistBuilder(config) health_checker = HealthChecker(config) report_gen = ReportGenerator(config) # Clear log file if os.path.exists('playlist_update.log'): open('playlist_update.log', 'w').close() # Statistics tracking stats = { 'total_channels': 0, 'valid_channels': 0, 'duplicates_removed': 0, 'imported_channels': 0, 'countries_detected': 0, 'country_distribution': {} } # Step 1: Create backup file_manager.create_backup('channels.txt') # Step 2: Clean existing corrupted entries processor.clean_corrupted_channels() # Step 3: Force update existing channels with new country detection processor.update_existing_channels_with_country_detection() # Step 4: Process imports imported_channels = processor.process_import() stats['imported_channels'] = len(imported_channels) logging.info(f"Import returned {len(imported_channels)} channels") # Step 5: Load all channels all_channels = file_manager.load_all_channels() stats['total_channels'] = len(all_channels) # Step 6: Remove duplicates unique_channels = processor.remove_duplicates_optimized(all_channels) stats['duplicates_removed'] = len(all_channels) - len(unique_channels) # Step 7: Sort channels if config.settings.get('sort_channels', True): unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) # Step 8: Health check (optional) health_results = {} if config.settings.get('enable_health_check', False): health_results = health_checker.batch_health_check(unique_channels) # Step 9: Generate M3U playlist valid_channels, country_stats = builder.generate_m3u(unique_channels) stats['valid_channels'] = valid_channels stats['country_distribution'] = country_stats stats['countries_detected'] = len(country_stats) # Step 10: Generate report report_gen.save_report(stats, health_results) logging.info(f"Playlist generation complete: {valid_channels} channels across {len(country_stats)} countries") logging.info(f"Top countries: {dict(list(sorted(country_stats.items(), key=lambda x: x[1], reverse=True))[:5])}") return True except Exception as e: logging.error(f"Fatal error in playlist generation: {e}") return False if __name__ == "__main__": success = generate_playlist() exit(0 if success else 1)