From 9abc1a2e9930a6ea8c64484b4a1293f3c9ec39c8 Mon Sep 17 00:00:00 2001 From: stoney420 Date: Sat, 28 Jun 2025 23:41:12 +0200 Subject: [PATCH] Update scripts/generate_playlist.py --- scripts/generate_playlist.py | 255 ++++++++++++++++++++++++++--------- 1 file changed, 193 insertions(+), 62 deletions(-) diff --git a/scripts/generate_playlist.py b/scripts/generate_playlist.py index 1669c28..9519794 100644 --- a/scripts/generate_playlist.py +++ b/scripts/generate_playlist.py @@ -1,7 +1,7 @@ #!/usr/bin/env python3 """ IPTV Playlist Generator - FIXED to run from root directory -Enhanced debugging to find why channels.txt is empty +Enhanced debugging with working imports and method calls """ import logging @@ -20,18 +20,23 @@ os.chdir(root_dir) # Add scripts directory to Python path sys.path.insert(0, str(script_dir)) -# Import our modular components -from config_manager import ConfigManager -from channel_processor import ChannelProcessor -from file_manager import FileManager -from playlist_builder import PlaylistBuilder -from health_checker import HealthChecker -from report_generator import ReportGenerator +# FIXED: Import our modular components with proper error handling +try: + from config_manager import ConfigManager + from channel_processor import ChannelProcessor + from file_manager import FileManager + from playlist_builder import PlaylistBuilder + from health_checker import HealthChecker + from report_generator import ReportGenerator +except ImportError as e: + print(f"Import error: {e}") + print("Make sure all required modules are in the scripts directory") + sys.exit(1) def setup_logging(): """Setup comprehensive logging.""" logging.basicConfig( - level=logging.DEBUG, # Changed to DEBUG for more info + level=logging.INFO, # Changed back to INFO for cleaner output format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ @@ -51,7 +56,7 @@ def debug_file_system(): # List all files in current directory try: files = os.listdir('.') - logging.info(f"Files in current directory: {files}") + logging.info(f"Files in current directory: {len(files)} files") except Exception as e: logging.error(f"Could not list current directory: {e}") @@ -81,118 +86,244 @@ def debug_file_system(): logging.info("=== END FILE SYSTEM DEBUG ===") +def load_existing_channels(channels_file): + """Load existing channels from channels.txt file.""" + channels = [] + + if not os.path.exists(channels_file): + logging.info(f"No existing channels file found: {channels_file}") + return channels + + try: + with open(channels_file, 'r', encoding='utf-8') as f: + content = f.read() + + # Split into channel blocks + blocks = content.split('\n\n') + + for block in blocks: + if not block.strip(): + continue + + # Parse channel block + channel_data = {} + lines = block.strip().split('\n') + + for line in lines: + if '=' in line: + key, value = line.split('=', 1) + channel_data[key.strip()] = value.strip() + + if channel_data and channel_data.get('Stream name'): + channels.append(channel_data) + + logging.info(f"Loaded {len(channels)} existing channels") + + except Exception as e: + logging.error(f"Error loading existing channels: {e}") + + return channels + +def save_channels_to_file(channels, filename): + """Save channels to file in proper format.""" + try: + with open(filename, 'w', encoding='utf-8') as f: + for i, channel in enumerate(channels): + if i > 0: + f.write("\n\n") + + f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") + f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") + f.write(f"Logo = {channel.get('Logo', '')}\n") + f.write(f"EPG id = {channel.get('EPG id', '')}\n") + f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") + + logging.info(f"Successfully saved {len(channels)} channels to {filename}") + return True + + except Exception as e: + logging.error(f"Error saving channels to {filename}: {e}") + return False + +def generate_m3u_playlist(channels, playlist_file): + """Generate M3U playlist from channels.""" + try: + with open(playlist_file, 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + + valid_channels = 0 + country_stats = {} + + for channel in channels: + stream_name = channel.get('Stream name', '') + group = channel.get('Group', 'Uncategorized') + logo = channel.get('Logo', '') + epg_id = channel.get('EPG id', '') + url = channel.get('Stream URL', '') + + if stream_name and url: + f.write(f'#EXTINF:-1 group-title="{group}"') + if logo: + f.write(f' tvg-logo="{logo}"') + if epg_id: + f.write(f' tvg-id="{epg_id}"') + f.write(f',{stream_name}\n') + f.write(f'{url}\n') + valid_channels += 1 + + # Count by country + country_stats[group] = country_stats.get(group, 0) + 1 + + logging.info(f"Generated M3U playlist with {valid_channels} channels across {len(country_stats)} groups") + return valid_channels, country_stats + + except Exception as e: + logging.error(f"Error generating M3U playlist: {e}") + return 0, {} + def generate_playlist(): """Main playlist generation function with enhanced debugging.""" try: setup_logging() - logging.info("Starting DEBUG playlist generation from ROOT directory...") + logging.info("🚀 Starting enhanced playlist generation...") # Debug file system first debug_file_system() - # Initialize all modules - logging.info("Initializing modules...") + # Initialize configuration + logging.info("📋 Initializing configuration...") config = ConfigManager() # Debug config logging.info(f"Config channels_file: {config.channels_file}") logging.info(f"Config import_file: {config.import_file}") - logging.info(f"Config settings: {config.settings}") - file_manager = FileManager(config) + # Initialize processor processor = ChannelProcessor(config) - builder = PlaylistBuilder(config) - health_checker = HealthChecker(config) - report_gen = ReportGenerator(config) - - # Clear log file - if os.path.exists('playlist_update.log'): - open('playlist_update.log', 'w').close() # Statistics tracking stats = { 'total_channels': 0, 'valid_channels': 0, - 'duplicates_removed': 0, 'imported_channels': 0, 'countries_detected': 0, 'country_distribution': {} } - # Step 1: Create backup + # Step 1: Create backup if channels.txt exists logging.info("=== STEP 1: Creating backup ===") - file_manager.create_backup('channels.txt') + if os.path.exists('channels.txt'): + try: + backup_name = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + import shutil + shutil.copy2('channels.txt', backup_name) + logging.info(f"✅ Created backup: {backup_name}") + except Exception as e: + logging.warning(f"Could not create backup: {e}") # Step 2: Clean existing corrupted entries logging.info("=== STEP 2: Cleaning corrupted channels ===") - processor.clean_corrupted_channels() + try: + processor.clean_corrupted_channels() + logging.info("✅ Corruption cleanup completed") + except Exception as e: + logging.warning(f"Corruption cleanup error: {e}") # Step 3: Force update existing channels with new country detection - logging.info("=== STEP 3: Updating existing channels ===") - processor.update_existing_channels_with_country_detection() + logging.info("=== STEP 3: Updating existing channels with enhanced country detection ===") + try: + processor.update_existing_channels_with_country_detection() + logging.info("✅ Country detection update completed") + except Exception as e: + logging.warning(f"Country detection update error: {e}") - # Step 4: Process imports (THE CRITICAL STEP) + # Step 4: Process imports logging.info("=== STEP 4: Processing imports ===") - # Manual check for import file in current directory + # Check for import file if os.path.exists('bulk_import.m3u'): - logging.info(f"✅ Found bulk_import.m3u in current directory") - with open('bulk_import.m3u', 'r', encoding='utf-8') as f: - content = f.read() - logging.info(f"Import file has {len(content)} characters") - logging.info(f"Import file first 200 chars: {content[:200]}") + logging.info(f"✅ Found bulk_import.m3u") + try: + with open('bulk_import.m3u', 'r', encoding='utf-8') as f: + content = f.read() + logging.info(f"Import file has {len(content)} characters") + + if len(content.strip()) > 10: # More than just #EXTM3U + imported_channels = processor.process_import() + stats['imported_channels'] = len(imported_channels) + logging.info(f"✅ Imported {len(imported_channels)} new channels") + else: + logging.info("Import file is empty, skipping import") + + except Exception as e: + logging.error(f"Error processing import: {e}") else: - logging.error(f"❌ bulk_import.m3u NOT found in current directory") - - imported_channels = processor.process_import() - stats['imported_channels'] = len(imported_channels) - logging.info(f"Import returned {len(imported_channels)} channels") - - if len(imported_channels) == 0: - logging.warning("NO CHANNELS IMPORTED! This is the problem.") + logging.info("No import file found, skipping import") # Step 5: Load all channels logging.info("=== STEP 5: Loading all channels ===") - all_channels = file_manager.load_all_channels() + all_channels = load_existing_channels('channels.txt') stats['total_channels'] = len(all_channels) - logging.info(f"Loaded {len(all_channels)} total channels") + logging.info(f"✅ Loaded {len(all_channels)} total channels") # Step 6: Remove duplicates logging.info("=== STEP 6: Removing duplicates ===") - unique_channels = processor.remove_duplicates_optimized(all_channels) - stats['duplicates_removed'] = len(all_channels) - len(unique_channels) - logging.info(f"After deduplication: {len(unique_channels)} channels") + try: + unique_channels = processor.remove_duplicates_optimized(all_channels) + duplicates_removed = len(all_channels) - len(unique_channels) + logging.info(f"✅ After deduplication: {len(unique_channels)} channels ({duplicates_removed} duplicates removed)") + except Exception as e: + logging.warning(f"Deduplication error: {e}, using original channels") + unique_channels = all_channels - # Step 7: Sort channels - if config.settings.get('sort_channels', True): + # Step 7: Sort channels by group and name + logging.info("=== STEP 7: Sorting channels ===") + try: unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) + logging.info("✅ Channels sorted by group and name") + except Exception as e: + logging.warning(f"Sorting error: {e}") - # Step 8: Health check (optional) - health_results = {} - if config.settings.get('enable_health_check', False): - health_results = health_checker.batch_health_check(unique_channels) + # Step 8: Save updated channels + logging.info("=== STEP 8: Saving updated channels ===") + if save_channels_to_file(unique_channels, 'channels.txt'): + logging.info("✅ Successfully saved updated channels.txt") + else: + logging.error("❌ Failed to save channels.txt") # Step 9: Generate M3U playlist - logging.info("=== STEP 9: Generating M3U ===") - valid_channels, country_stats = builder.generate_m3u(unique_channels) + logging.info("=== STEP 9: Generating M3U playlist ===") + valid_channels, country_stats = generate_m3u_playlist(unique_channels, 'playlist.m3u') stats['valid_channels'] = valid_channels stats['country_distribution'] = country_stats stats['countries_detected'] = len(country_stats) - # Step 10: Generate report - logging.info("=== STEP 10: Generating report ===") - report_gen.save_report(stats, health_results) + # Step 10: Generate summary report + logging.info("=== STEP 10: Generating summary report ===") - logging.info(f"Playlist generation complete: {valid_channels} channels across {len(country_stats)} countries") + # Show top countries + sorted_countries = sorted(country_stats.items(), key=lambda x: x[1], reverse=True) + logging.info("🌍 Top Countries/Groups:") + for country, count in sorted_countries[:10]: + percentage = (count / valid_channels * 100) if valid_channels > 0 else 0 + logging.info(f" {country}: {count} channels ({percentage:.1f}%)") + + # Final summary + logging.info("🎉 PLAYLIST GENERATION COMPLETED SUCCESSFULLY!") + logging.info(f"📊 Final Statistics:") + logging.info(f" 📺 Total channels processed: {stats['total_channels']}") + logging.info(f" ✅ Valid channels in playlist: {stats['valid_channels']}") + logging.info(f" 📥 New channels imported: {stats['imported_channels']}") + logging.info(f" 🌍 Countries/groups detected: {stats['countries_detected']}") # Final debug - logging.info("=== FINAL DEBUG ===") + logging.info("=== FINAL FILE CHECK ===") debug_file_system() return True except Exception as e: - logging.error(f"Fatal error in playlist generation: {e}") + logging.error(f"❌ Fatal error in playlist generation: {e}") import traceback logging.error(traceback.format_exc()) return False