#!/usr/bin/env python3 """ IPTV Playlist Generator - FIXED to run from root directory Enhanced debugging with working imports and method calls """ import logging import os import sys from datetime import datetime from pathlib import Path # FIXED: Change to root directory and add scripts to path script_dir = Path(__file__).parent root_dir = script_dir.parent # Change working directory to root os.chdir(root_dir) # Add scripts directory to Python path sys.path.insert(0, str(script_dir)) # FIXED: Import our modular components with proper error handling try: from config_manager import ConfigManager from channel_processor import ChannelProcessor from file_manager import FileManager from playlist_builder import PlaylistBuilder from health_checker import HealthChecker from report_generator import ReportGenerator except ImportError as e: print(f"Import error: {e}") print("Make sure all required modules are in the scripts directory") sys.exit(1) def setup_logging(): """Setup comprehensive logging.""" logging.basicConfig( level=logging.INFO, # Changed back to INFO for cleaner output format='[%(asctime)s] %(levelname)s: %(message)s', datefmt='%Y-%m-%d %H:%M:%S', handlers=[ logging.FileHandler('playlist_update.log', encoding='utf-8'), logging.StreamHandler() ] ) def debug_file_system(): """Debug the file system to see what files exist.""" logging.info("=== FILE SYSTEM DEBUG ===") # Check current directory current_dir = os.getcwd() logging.info(f"Current working directory: {current_dir}") # List all files in current directory try: files = os.listdir('.') logging.info(f"Files in current directory: {len(files)} files") except Exception as e: logging.error(f"Could not list current directory: {e}") # Check for specific files files_to_check = [ 'bulk_import.m3u', 'channels.txt', 'playlist.m3u' ] for file_path in files_to_check: if os.path.exists(file_path): try: size = os.path.getsize(file_path) logging.info(f"✅ Found {file_path} (size: {size} bytes)") # If it's the import file, show first few lines if 'bulk_import.m3u' in file_path and size > 0: with open(file_path, 'r', encoding='utf-8') as f: first_lines = [f.readline().strip() for _ in range(3)] logging.info(f" First 3 lines: {first_lines}") except Exception as e: logging.error(f" Error reading {file_path}: {e}") else: logging.info(f"❌ Missing: {file_path}") logging.info("=== END FILE SYSTEM DEBUG ===") def load_existing_channels(channels_file): """Load existing channels from channels.txt file.""" channels = [] if not os.path.exists(channels_file): logging.info(f"No existing channels file found: {channels_file}") return channels try: with open(channels_file, 'r', encoding='utf-8') as f: content = f.read() # Split into channel blocks blocks = content.split('\n\n') for block in blocks: if not block.strip(): continue # Parse channel block channel_data = {} lines = block.strip().split('\n') for line in lines: if '=' in line: key, value = line.split('=', 1) channel_data[key.strip()] = value.strip() if channel_data and channel_data.get('Stream name'): channels.append(channel_data) logging.info(f"Loaded {len(channels)} existing channels") except Exception as e: logging.error(f"Error loading existing channels: {e}") return channels def save_channels_to_file(channels, filename): """Save channels to file in proper format.""" try: with open(filename, 'w', encoding='utf-8') as f: for i, channel in enumerate(channels): if i > 0: f.write("\n\n") f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") f.write(f"Logo = {channel.get('Logo', '')}\n") f.write(f"EPG id = {channel.get('EPG id', '')}\n") f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") logging.info(f"Successfully saved {len(channels)} channels to {filename}") return True except Exception as e: logging.error(f"Error saving channels to {filename}: {e}") return False def generate_m3u_playlist(channels, playlist_file): """Generate M3U playlist from channels.""" try: with open(playlist_file, 'w', encoding='utf-8') as f: f.write('#EXTM3U\n') valid_channels = 0 country_stats = {} for channel in channels: stream_name = channel.get('Stream name', '') group = channel.get('Group', 'Uncategorized') logo = channel.get('Logo', '') epg_id = channel.get('EPG id', '') url = channel.get('Stream URL', '') if stream_name and url: f.write(f'#EXTINF:-1 group-title="{group}"') if logo: f.write(f' tvg-logo="{logo}"') if epg_id: f.write(f' tvg-id="{epg_id}"') f.write(f',{stream_name}\n') f.write(f'{url}\n') valid_channels += 1 # Count by country country_stats[group] = country_stats.get(group, 0) + 1 logging.info(f"Generated M3U playlist with {valid_channels} channels across {len(country_stats)} groups") return valid_channels, country_stats except Exception as e: logging.error(f"Error generating M3U playlist: {e}") return 0, {} def generate_playlist(): """Main playlist generation function with enhanced debugging.""" try: setup_logging() logging.info("🚀 Starting enhanced playlist generation...") # Debug file system first debug_file_system() # Initialize configuration logging.info("📋 Initializing configuration...") config = ConfigManager() # Debug config logging.info(f"Config channels_file: {config.channels_file}") logging.info(f"Config import_file: {config.import_file}") # Initialize processor processor = ChannelProcessor(config) # Statistics tracking stats = { 'total_channels': 0, 'valid_channels': 0, 'imported_channels': 0, 'countries_detected': 0, 'country_distribution': {} } # Step 1: Create backup if channels.txt exists logging.info("=== STEP 1: Creating backup ===") if os.path.exists('channels.txt'): try: backup_name = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" import shutil shutil.copy2('channels.txt', backup_name) logging.info(f"✅ Created backup: {backup_name}") except Exception as e: logging.warning(f"Could not create backup: {e}") # Step 2: Clean existing corrupted entries logging.info("=== STEP 2: Cleaning corrupted channels ===") try: processor.clean_corrupted_channels() logging.info("✅ Corruption cleanup completed") except Exception as e: logging.warning(f"Corruption cleanup error: {e}") # Step 3: Force update existing channels with new country detection logging.info("=== STEP 3: Updating existing channels with enhanced country detection ===") try: processor.update_existing_channels_with_country_detection() logging.info("✅ Country detection update completed") except Exception as e: logging.warning(f"Country detection update error: {e}") # Step 4: Process imports logging.info("=== STEP 4: Processing imports ===") # Check for import file if os.path.exists('bulk_import.m3u'): logging.info(f"✅ Found bulk_import.m3u") try: with open('bulk_import.m3u', 'r', encoding='utf-8') as f: content = f.read() logging.info(f"Import file has {len(content)} characters") if len(content.strip()) > 10: # More than just #EXTM3U imported_channels = processor.process_import() stats['imported_channels'] = len(imported_channels) logging.info(f"✅ Imported {len(imported_channels)} new channels") else: logging.info("Import file is empty, skipping import") except Exception as e: logging.error(f"Error processing import: {e}") else: logging.info("No import file found, skipping import") # Step 5: Load all channels logging.info("=== STEP 5: Loading all channels ===") all_channels = load_existing_channels('channels.txt') stats['total_channels'] = len(all_channels) logging.info(f"✅ Loaded {len(all_channels)} total channels") # Step 6: Remove duplicates logging.info("=== STEP 6: Removing duplicates ===") try: unique_channels = processor.remove_duplicates_optimized(all_channels) duplicates_removed = len(all_channels) - len(unique_channels) logging.info(f"✅ After deduplication: {len(unique_channels)} channels ({duplicates_removed} duplicates removed)") except Exception as e: logging.warning(f"Deduplication error: {e}, using original channels") unique_channels = all_channels # Step 7: Sort channels by group and name logging.info("=== STEP 7: Sorting channels ===") try: unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) logging.info("✅ Channels sorted by group and name") except Exception as e: logging.warning(f"Sorting error: {e}") # Step 8: Save updated channels logging.info("=== STEP 8: Saving updated channels ===") if save_channels_to_file(unique_channels, 'channels.txt'): logging.info("✅ Successfully saved updated channels.txt") else: logging.error("❌ Failed to save channels.txt") # Step 9: Generate M3U playlist logging.info("=== STEP 9: Generating M3U playlist ===") valid_channels, country_stats = generate_m3u_playlist(unique_channels, 'playlist.m3u') stats['valid_channels'] = valid_channels stats['country_distribution'] = country_stats stats['countries_detected'] = len(country_stats) # Step 10: Generate summary report logging.info("=== STEP 10: Generating summary report ===") # Show top countries sorted_countries = sorted(country_stats.items(), key=lambda x: x[1], reverse=True) logging.info("🌍 Top Countries/Groups:") for country, count in sorted_countries[:10]: percentage = (count / valid_channels * 100) if valid_channels > 0 else 0 logging.info(f" {country}: {count} channels ({percentage:.1f}%)") # Final summary logging.info("🎉 PLAYLIST GENERATION COMPLETED SUCCESSFULLY!") logging.info(f"📊 Final Statistics:") logging.info(f" 📺 Total channels processed: {stats['total_channels']}") logging.info(f" ✅ Valid channels in playlist: {stats['valid_channels']}") logging.info(f" 📥 New channels imported: {stats['imported_channels']}") logging.info(f" 🌍 Countries/groups detected: {stats['countries_detected']}") # Final debug logging.info("=== FINAL FILE CHECK ===") debug_file_system() return True except Exception as e: logging.error(f"❌ Fatal error in playlist generation: {e}") import traceback logging.error(traceback.format_exc()) return False if __name__ == "__main__": success = generate_playlist() exit(0 if success else 1)