From 5c299c99f8ab8455415ae49c6291353fa79d0bc5 Mon Sep 17 00:00:00 2001 From: stoney420 Date: Sun, 29 Jun 2025 00:16:25 +0200 Subject: [PATCH] Update scripts/channel_processor.py --- scripts/channel_processor.py | 454 +++++++++++++++++++++++++---------- 1 file changed, 325 insertions(+), 129 deletions(-) diff --git a/scripts/channel_processor.py b/scripts/channel_processor.py index 80948cb..9519794 100644 --- a/scripts/channel_processor.py +++ b/scripts/channel_processor.py @@ -1,137 +1,333 @@ -def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str: - """Enhanced country detection with priority rules and platform detection.""" - # Create cache key - cache_key = f"{channel_name}|{epg_id}|{logo_url}" - if cache_key in self._country_cache: - return self._country_cache[cache_key] +#!/usr/bin/env python3 +""" +IPTV Playlist Generator - FIXED to run from root directory +Enhanced debugging with working imports and method calls +""" + +import logging +import os +import sys +from datetime import datetime +from pathlib import Path + +# FIXED: Change to root directory and add scripts to path +script_dir = Path(__file__).parent +root_dir = script_dir.parent + +# Change working directory to root +os.chdir(root_dir) + +# Add scripts directory to Python path +sys.path.insert(0, str(script_dir)) + +# FIXED: Import our modular components with proper error handling +try: + from config_manager import ConfigManager + from channel_processor import ChannelProcessor + from file_manager import FileManager + from playlist_builder import PlaylistBuilder + from health_checker import HealthChecker + from report_generator import ReportGenerator +except ImportError as e: + print(f"Import error: {e}") + print("Make sure all required modules are in the scripts directory") + sys.exit(1) + +def setup_logging(): + """Setup comprehensive logging.""" + logging.basicConfig( + level=logging.INFO, # Changed back to INFO for cleaner output + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.FileHandler('playlist_update.log', encoding='utf-8'), + logging.StreamHandler() + ] + ) + +def debug_file_system(): + """Debug the file system to see what files exist.""" + logging.info("=== FILE SYSTEM DEBUG ===") + + # Check current directory + current_dir = os.getcwd() + logging.info(f"Current working directory: {current_dir}") + + # List all files in current directory + try: + files = os.listdir('.') + logging.info(f"Files in current directory: {len(files)} files") + except Exception as e: + logging.error(f"Could not list current directory: {e}") + + # Check for specific files + files_to_check = [ + 'bulk_import.m3u', + 'channels.txt', + 'playlist.m3u' + ] + + for file_path in files_to_check: + if os.path.exists(file_path): + try: + size = os.path.getsize(file_path) + logging.info(f"βœ… Found {file_path} (size: {size} bytes)") + + # If it's the import file, show first few lines + if 'bulk_import.m3u' in file_path and size > 0: + with open(file_path, 'r', encoding='utf-8') as f: + first_lines = [f.readline().strip() for _ in range(3)] + logging.info(f" First 3 lines: {first_lines}") + + except Exception as e: + logging.error(f" Error reading {file_path}: {e}") + else: + logging.info(f"❌ Missing: {file_path}") + + logging.info("=== END FILE SYSTEM DEBUG ===") + +def load_existing_channels(channels_file): + """Load existing channels from channels.txt file.""" + channels = [] + + if not os.path.exists(channels_file): + logging.info(f"No existing channels file found: {channels_file}") + return channels + + try: + with open(channels_file, 'r', encoding='utf-8') as f: + content = f.read() - # Combine all text for analysis - all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}" - channel_lower = channel_name.lower() + # Split into channel blocks + blocks = content.split('\n\n') - # PRIORITY 1: EPG ID suffix detection (most reliable) - if ".ca" in epg_id.lower(): - result = "πŸ‡¨πŸ‡¦ Canada" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ca)") - return result - elif ".us" in epg_id.lower(): - result = "πŸ‡ΊπŸ‡Έ United States" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .us)") - return result - elif ".uk" in epg_id.lower(): - result = "πŸ‡¬πŸ‡§ United Kingdom" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .uk)") - return result - elif ".ph" in epg_id.lower(): - result = "πŸ‡΅πŸ‡­ Philippines" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ph)") - return result - elif ".au" in epg_id.lower(): - result = "πŸ‡¦πŸ‡Ί Australia" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .au)") - return result - elif ".jp" in epg_id.lower(): - result = "πŸ‡―πŸ‡΅ Japan" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .jp)") - return result - - # PRIORITY 2: Specific channel fixes for misclassified channels - - # Canadian sports channels (TSN series) - if any(x in channel_lower for x in ["tsn 1", "tsn 2", "tsn 3", "tsn 4", "tsn 5", "tsn1", "tsn2", "tsn3", "tsn4", "tsn5"]): - result = "πŸ‡¨πŸ‡¦ Canada" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (TSN Sports)") - return result - - # CBC News Toronto (Canadian) - if "cbc news toronto" in channel_lower: - result = "πŸ‡¨πŸ‡¦ Canada" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (CBC Toronto)") - return result - - # US channels that were misclassified - if any(x in channel_lower for x in ["tv land", "tvland", "we tv", "wetv", "all weddings we tv", "cheaters", "cheers", "christmas 365"]): - result = "πŸ‡ΊπŸ‡Έ United States" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (US Network)") - return result - - # UK shows/channels - if "come dine with me" in channel_lower: - result = "πŸ‡¬πŸ‡§ United Kingdom" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (UK Show)") - return result - - # Philippines news channels - if any(x in channel_lower for x in ["anc global", "anc ph"]): - result = "πŸ‡΅πŸ‡­ Philippines" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (Philippines News)") - return result - - # Japan anime channels - if "animax" in channel_lower: - result = "πŸ‡―πŸ‡΅ Japan" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (Japanese Anime)") - return result - - # PRIORITY 3: Platform-based detection - - # Pluto TV special handling - if "pluto.tv" in all_text or "images.pluto.tv" in all_text or "jmp2.uk/plu-" in all_text: - # Pluto TV regional overrides - pluto_overrides = { - "cbc news toronto": "πŸ‡¨πŸ‡¦ Canada", - "come dine with me": "πŸ‡¬πŸ‡§ United Kingdom" - } + for block in blocks: + if not block.strip(): + continue + + # Parse channel block + channel_data = {} + lines = block.strip().split('\n') - for channel_pattern, country in pluto_overrides.items(): - if channel_pattern in channel_lower: - result = country - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Regional)") - return result + for line in lines: + if '=' in line: + key, value = line.split('=', 1) + channel_data[key.strip()] = value.strip() - # Default Pluto TV to US - result = "πŸ‡ΊπŸ‡Έ United States" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Default)") - return result + if channel_data and channel_data.get('Stream name'): + channels.append(channel_data) - # Plex TV handling (mostly US) - if "plex.tv" in all_text or "provider-static.plex.tv" in all_text: - result = "πŸ‡ΊπŸ‡Έ United States" - self._country_cache[cache_key] = result - self.logger.debug(f"Detected {result} for: {channel_name} (Plex TV)") - return result + logging.info(f"Loaded {len(channels)} existing channels") - # PRIORITY 4: Check prefixes (existing logic) - for country, prefixes in self.config.patterns["country_prefixes"].items(): - for prefix in prefixes: - if prefix in all_text: - self._country_cache[cache_key] = country - self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')") - return country + except Exception as e: + logging.error(f"Error loading existing channels: {e}") + + return channels + +def save_channels_to_file(channels, filename): + """Save channels to file in proper format.""" + try: + with open(filename, 'w', encoding='utf-8') as f: + for i, channel in enumerate(channels): + if i > 0: + f.write("\n\n") + + f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") + f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") + f.write(f"Logo = {channel.get('Logo', '')}\n") + f.write(f"EPG id = {channel.get('EPG id', '')}\n") + f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") - # PRIORITY 5: Check general patterns (existing logic) - for country, keywords in self.config.patterns["country_patterns"].items(): - for keyword in keywords: - if keyword in all_text: - self._country_cache[cache_key] = country - self.logger.debug(f"Detected {country} for: {channel_name} (keyword: '{keyword}')") - return country + logging.info(f"Successfully saved {len(channels)} channels to {filename}") + return True - # Cache negative result too - self._country_cache[cache_key] = "Uncategorized" - self.logger.debug(f"No country detected for: {channel_name} - marked as Uncategorized") - return "Uncategorized" \ No newline at end of file + except Exception as e: + logging.error(f"Error saving channels to {filename}: {e}") + return False + +def generate_m3u_playlist(channels, playlist_file): + """Generate M3U playlist from channels.""" + try: + with open(playlist_file, 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + + valid_channels = 0 + country_stats = {} + + for channel in channels: + stream_name = channel.get('Stream name', '') + group = channel.get('Group', 'Uncategorized') + logo = channel.get('Logo', '') + epg_id = channel.get('EPG id', '') + url = channel.get('Stream URL', '') + + if stream_name and url: + f.write(f'#EXTINF:-1 group-title="{group}"') + if logo: + f.write(f' tvg-logo="{logo}"') + if epg_id: + f.write(f' tvg-id="{epg_id}"') + f.write(f',{stream_name}\n') + f.write(f'{url}\n') + valid_channels += 1 + + # Count by country + country_stats[group] = country_stats.get(group, 0) + 1 + + logging.info(f"Generated M3U playlist with {valid_channels} channels across {len(country_stats)} groups") + return valid_channels, country_stats + + except Exception as e: + logging.error(f"Error generating M3U playlist: {e}") + return 0, {} + +def generate_playlist(): + """Main playlist generation function with enhanced debugging.""" + try: + setup_logging() + logging.info("πŸš€ Starting enhanced playlist generation...") + + # Debug file system first + debug_file_system() + + # Initialize configuration + logging.info("πŸ“‹ Initializing configuration...") + config = ConfigManager() + + # Debug config + logging.info(f"Config channels_file: {config.channels_file}") + logging.info(f"Config import_file: {config.import_file}") + + # Initialize processor + processor = ChannelProcessor(config) + + # Statistics tracking + stats = { + 'total_channels': 0, + 'valid_channels': 0, + 'imported_channels': 0, + 'countries_detected': 0, + 'country_distribution': {} + } + + # Step 1: Create backup if channels.txt exists + logging.info("=== STEP 1: Creating backup ===") + if os.path.exists('channels.txt'): + try: + backup_name = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + import shutil + shutil.copy2('channels.txt', backup_name) + logging.info(f"βœ… Created backup: {backup_name}") + except Exception as e: + logging.warning(f"Could not create backup: {e}") + + # Step 2: Clean existing corrupted entries + logging.info("=== STEP 2: Cleaning corrupted channels ===") + try: + processor.clean_corrupted_channels() + logging.info("βœ… Corruption cleanup completed") + except Exception as e: + logging.warning(f"Corruption cleanup error: {e}") + + # Step 3: Force update existing channels with new country detection + logging.info("=== STEP 3: Updating existing channels with enhanced country detection ===") + try: + processor.update_existing_channels_with_country_detection() + logging.info("βœ… Country detection update completed") + except Exception as e: + logging.warning(f"Country detection update error: {e}") + + # Step 4: Process imports + logging.info("=== STEP 4: Processing imports ===") + + # Check for import file + if os.path.exists('bulk_import.m3u'): + logging.info(f"βœ… Found bulk_import.m3u") + try: + with open('bulk_import.m3u', 'r', encoding='utf-8') as f: + content = f.read() + logging.info(f"Import file has {len(content)} characters") + + if len(content.strip()) > 10: # More than just #EXTM3U + imported_channels = processor.process_import() + stats['imported_channels'] = len(imported_channels) + logging.info(f"βœ… Imported {len(imported_channels)} new channels") + else: + logging.info("Import file is empty, skipping import") + + except Exception as e: + logging.error(f"Error processing import: {e}") + else: + logging.info("No import file found, skipping import") + + # Step 5: Load all channels + logging.info("=== STEP 5: Loading all channels ===") + all_channels = load_existing_channels('channels.txt') + stats['total_channels'] = len(all_channels) + logging.info(f"βœ… Loaded {len(all_channels)} total channels") + + # Step 6: Remove duplicates + logging.info("=== STEP 6: Removing duplicates ===") + try: + unique_channels = processor.remove_duplicates_optimized(all_channels) + duplicates_removed = len(all_channels) - len(unique_channels) + logging.info(f"βœ… After deduplication: {len(unique_channels)} channels ({duplicates_removed} duplicates removed)") + except Exception as e: + logging.warning(f"Deduplication error: {e}, using original channels") + unique_channels = all_channels + + # Step 7: Sort channels by group and name + logging.info("=== STEP 7: Sorting channels ===") + try: + unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) + logging.info("βœ… Channels sorted by group and name") + except Exception as e: + logging.warning(f"Sorting error: {e}") + + # Step 8: Save updated channels + logging.info("=== STEP 8: Saving updated channels ===") + if save_channels_to_file(unique_channels, 'channels.txt'): + logging.info("βœ… Successfully saved updated channels.txt") + else: + logging.error("❌ Failed to save channels.txt") + + # Step 9: Generate M3U playlist + logging.info("=== STEP 9: Generating M3U playlist ===") + valid_channels, country_stats = generate_m3u_playlist(unique_channels, 'playlist.m3u') + stats['valid_channels'] = valid_channels + stats['country_distribution'] = country_stats + stats['countries_detected'] = len(country_stats) + + # Step 10: Generate summary report + logging.info("=== STEP 10: Generating summary report ===") + + # Show top countries + sorted_countries = sorted(country_stats.items(), key=lambda x: x[1], reverse=True) + logging.info("🌍 Top Countries/Groups:") + for country, count in sorted_countries[:10]: + percentage = (count / valid_channels * 100) if valid_channels > 0 else 0 + logging.info(f" {country}: {count} channels ({percentage:.1f}%)") + + # Final summary + logging.info("πŸŽ‰ PLAYLIST GENERATION COMPLETED SUCCESSFULLY!") + logging.info(f"πŸ“Š Final Statistics:") + logging.info(f" πŸ“Ί Total channels processed: {stats['total_channels']}") + logging.info(f" βœ… Valid channels in playlist: {stats['valid_channels']}") + logging.info(f" πŸ“₯ New channels imported: {stats['imported_channels']}") + logging.info(f" 🌍 Countries/groups detected: {stats['countries_detected']}") + + # Final debug + logging.info("=== FINAL FILE CHECK ===") + debug_file_system() + + return True + + except Exception as e: + logging.error(f"❌ Fatal error in playlist generation: {e}") + import traceback + logging.error(traceback.format_exc()) + return False + +if __name__ == "__main__": + success = generate_playlist() + exit(0 if success else 1) \ No newline at end of file