diff --git a/scripts/channel_processor.py b/scripts/channel_processor.py index 9519794..80948cb 100644 --- a/scripts/channel_processor.py +++ b/scripts/channel_processor.py @@ -1,333 +1,137 @@ -#!/usr/bin/env python3 -""" -IPTV Playlist Generator - FIXED to run from root directory -Enhanced debugging with working imports and method calls -""" - -import logging -import os -import sys -from datetime import datetime -from pathlib import Path - -# FIXED: Change to root directory and add scripts to path -script_dir = Path(__file__).parent -root_dir = script_dir.parent - -# Change working directory to root -os.chdir(root_dir) - -# Add scripts directory to Python path -sys.path.insert(0, str(script_dir)) - -# FIXED: Import our modular components with proper error handling -try: - from config_manager import ConfigManager - from channel_processor import ChannelProcessor - from file_manager import FileManager - from playlist_builder import PlaylistBuilder - from health_checker import HealthChecker - from report_generator import ReportGenerator -except ImportError as e: - print(f"Import error: {e}") - print("Make sure all required modules are in the scripts directory") - sys.exit(1) - -def setup_logging(): - """Setup comprehensive logging.""" - logging.basicConfig( - level=logging.INFO, # Changed back to INFO for cleaner output - format='[%(asctime)s] %(levelname)s: %(message)s', - datefmt='%Y-%m-%d %H:%M:%S', - handlers=[ - logging.FileHandler('playlist_update.log', encoding='utf-8'), - logging.StreamHandler() - ] - ) - -def debug_file_system(): - """Debug the file system to see what files exist.""" - logging.info("=== FILE SYSTEM DEBUG ===") - - # Check current directory - current_dir = os.getcwd() - logging.info(f"Current working directory: {current_dir}") - - # List all files in current directory - try: - files = os.listdir('.') - logging.info(f"Files in current directory: {len(files)} files") - except Exception as e: - logging.error(f"Could not list current directory: {e}") - - # Check for specific files - files_to_check = [ - 'bulk_import.m3u', - 'channels.txt', - 'playlist.m3u' - ] - - for file_path in files_to_check: - if os.path.exists(file_path): - try: - size = os.path.getsize(file_path) - logging.info(f"βœ… Found {file_path} (size: {size} bytes)") - - # If it's the import file, show first few lines - if 'bulk_import.m3u' in file_path and size > 0: - with open(file_path, 'r', encoding='utf-8') as f: - first_lines = [f.readline().strip() for _ in range(3)] - logging.info(f" First 3 lines: {first_lines}") - - except Exception as e: - logging.error(f" Error reading {file_path}: {e}") - else: - logging.info(f"❌ Missing: {file_path}") - - logging.info("=== END FILE SYSTEM DEBUG ===") - -def load_existing_channels(channels_file): - """Load existing channels from channels.txt file.""" - channels = [] - - if not os.path.exists(channels_file): - logging.info(f"No existing channels file found: {channels_file}") - return channels - - try: - with open(channels_file, 'r', encoding='utf-8') as f: - content = f.read() +def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str: + """Enhanced country detection with priority rules and platform detection.""" + # Create cache key + cache_key = f"{channel_name}|{epg_id}|{logo_url}" + if cache_key in self._country_cache: + return self._country_cache[cache_key] - # Split into channel blocks - blocks = content.split('\n\n') + # Combine all text for analysis + all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}" + channel_lower = channel_name.lower() - for block in blocks: - if not block.strip(): - continue - - # Parse channel block - channel_data = {} - lines = block.strip().split('\n') + # PRIORITY 1: EPG ID suffix detection (most reliable) + if ".ca" in epg_id.lower(): + result = "πŸ‡¨πŸ‡¦ Canada" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ca)") + return result + elif ".us" in epg_id.lower(): + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .us)") + return result + elif ".uk" in epg_id.lower(): + result = "πŸ‡¬πŸ‡§ United Kingdom" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .uk)") + return result + elif ".ph" in epg_id.lower(): + result = "πŸ‡΅πŸ‡­ Philippines" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ph)") + return result + elif ".au" in epg_id.lower(): + result = "πŸ‡¦πŸ‡Ί Australia" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .au)") + return result + elif ".jp" in epg_id.lower(): + result = "πŸ‡―πŸ‡΅ Japan" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .jp)") + return result + + # PRIORITY 2: Specific channel fixes for misclassified channels + + # Canadian sports channels (TSN series) + if any(x in channel_lower for x in ["tsn 1", "tsn 2", "tsn 3", "tsn 4", "tsn 5", "tsn1", "tsn2", "tsn3", "tsn4", "tsn5"]): + result = "πŸ‡¨πŸ‡¦ Canada" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (TSN Sports)") + return result + + # CBC News Toronto (Canadian) + if "cbc news toronto" in channel_lower: + result = "πŸ‡¨πŸ‡¦ Canada" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (CBC Toronto)") + return result + + # US channels that were misclassified + if any(x in channel_lower for x in ["tv land", "tvland", "we tv", "wetv", "all weddings we tv", "cheaters", "cheers", "christmas 365"]): + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (US Network)") + return result + + # UK shows/channels + if "come dine with me" in channel_lower: + result = "πŸ‡¬πŸ‡§ United Kingdom" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (UK Show)") + return result + + # Philippines news channels + if any(x in channel_lower for x in ["anc global", "anc ph"]): + result = "πŸ‡΅πŸ‡­ Philippines" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Philippines News)") + return result + + # Japan anime channels + if "animax" in channel_lower: + result = "πŸ‡―πŸ‡΅ Japan" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Japanese Anime)") + return result + + # PRIORITY 3: Platform-based detection + + # Pluto TV special handling + if "pluto.tv" in all_text or "images.pluto.tv" in all_text or "jmp2.uk/plu-" in all_text: + # Pluto TV regional overrides + pluto_overrides = { + "cbc news toronto": "πŸ‡¨πŸ‡¦ Canada", + "come dine with me": "πŸ‡¬πŸ‡§ United Kingdom" + } - for line in lines: - if '=' in line: - key, value = line.split('=', 1) - channel_data[key.strip()] = value.strip() + for channel_pattern, country in pluto_overrides.items(): + if channel_pattern in channel_lower: + result = country + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Regional)") + return result - if channel_data and channel_data.get('Stream name'): - channels.append(channel_data) + # Default Pluto TV to US + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Default)") + return result - logging.info(f"Loaded {len(channels)} existing channels") + # Plex TV handling (mostly US) + if "plex.tv" in all_text or "provider-static.plex.tv" in all_text: + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Plex TV)") + return result - except Exception as e: - logging.error(f"Error loading existing channels: {e}") - - return channels - -def save_channels_to_file(channels, filename): - """Save channels to file in proper format.""" - try: - with open(filename, 'w', encoding='utf-8') as f: - for i, channel in enumerate(channels): - if i > 0: - f.write("\n\n") - - f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") - f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") - f.write(f"Logo = {channel.get('Logo', '')}\n") - f.write(f"EPG id = {channel.get('EPG id', '')}\n") - f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") + # PRIORITY 4: Check prefixes (existing logic) + for country, prefixes in self.config.patterns["country_prefixes"].items(): + for prefix in prefixes: + if prefix in all_text: + self._country_cache[cache_key] = country + self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')") + return country - logging.info(f"Successfully saved {len(channels)} channels to {filename}") - return True + # PRIORITY 5: Check general patterns (existing logic) + for country, keywords in self.config.patterns["country_patterns"].items(): + for keyword in keywords: + if keyword in all_text: + self._country_cache[cache_key] = country + self.logger.debug(f"Detected {country} for: {channel_name} (keyword: '{keyword}')") + return country - except Exception as e: - logging.error(f"Error saving channels to {filename}: {e}") - return False - -def generate_m3u_playlist(channels, playlist_file): - """Generate M3U playlist from channels.""" - try: - with open(playlist_file, 'w', encoding='utf-8') as f: - f.write('#EXTM3U\n') - - valid_channels = 0 - country_stats = {} - - for channel in channels: - stream_name = channel.get('Stream name', '') - group = channel.get('Group', 'Uncategorized') - logo = channel.get('Logo', '') - epg_id = channel.get('EPG id', '') - url = channel.get('Stream URL', '') - - if stream_name and url: - f.write(f'#EXTINF:-1 group-title="{group}"') - if logo: - f.write(f' tvg-logo="{logo}"') - if epg_id: - f.write(f' tvg-id="{epg_id}"') - f.write(f',{stream_name}\n') - f.write(f'{url}\n') - valid_channels += 1 - - # Count by country - country_stats[group] = country_stats.get(group, 0) + 1 - - logging.info(f"Generated M3U playlist with {valid_channels} channels across {len(country_stats)} groups") - return valid_channels, country_stats - - except Exception as e: - logging.error(f"Error generating M3U playlist: {e}") - return 0, {} - -def generate_playlist(): - """Main playlist generation function with enhanced debugging.""" - try: - setup_logging() - logging.info("πŸš€ Starting enhanced playlist generation...") - - # Debug file system first - debug_file_system() - - # Initialize configuration - logging.info("πŸ“‹ Initializing configuration...") - config = ConfigManager() - - # Debug config - logging.info(f"Config channels_file: {config.channels_file}") - logging.info(f"Config import_file: {config.import_file}") - - # Initialize processor - processor = ChannelProcessor(config) - - # Statistics tracking - stats = { - 'total_channels': 0, - 'valid_channels': 0, - 'imported_channels': 0, - 'countries_detected': 0, - 'country_distribution': {} - } - - # Step 1: Create backup if channels.txt exists - logging.info("=== STEP 1: Creating backup ===") - if os.path.exists('channels.txt'): - try: - backup_name = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" - import shutil - shutil.copy2('channels.txt', backup_name) - logging.info(f"βœ… Created backup: {backup_name}") - except Exception as e: - logging.warning(f"Could not create backup: {e}") - - # Step 2: Clean existing corrupted entries - logging.info("=== STEP 2: Cleaning corrupted channels ===") - try: - processor.clean_corrupted_channels() - logging.info("βœ… Corruption cleanup completed") - except Exception as e: - logging.warning(f"Corruption cleanup error: {e}") - - # Step 3: Force update existing channels with new country detection - logging.info("=== STEP 3: Updating existing channels with enhanced country detection ===") - try: - processor.update_existing_channels_with_country_detection() - logging.info("βœ… Country detection update completed") - except Exception as e: - logging.warning(f"Country detection update error: {e}") - - # Step 4: Process imports - logging.info("=== STEP 4: Processing imports ===") - - # Check for import file - if os.path.exists('bulk_import.m3u'): - logging.info(f"βœ… Found bulk_import.m3u") - try: - with open('bulk_import.m3u', 'r', encoding='utf-8') as f: - content = f.read() - logging.info(f"Import file has {len(content)} characters") - - if len(content.strip()) > 10: # More than just #EXTM3U - imported_channels = processor.process_import() - stats['imported_channels'] = len(imported_channels) - logging.info(f"βœ… Imported {len(imported_channels)} new channels") - else: - logging.info("Import file is empty, skipping import") - - except Exception as e: - logging.error(f"Error processing import: {e}") - else: - logging.info("No import file found, skipping import") - - # Step 5: Load all channels - logging.info("=== STEP 5: Loading all channels ===") - all_channels = load_existing_channels('channels.txt') - stats['total_channels'] = len(all_channels) - logging.info(f"βœ… Loaded {len(all_channels)} total channels") - - # Step 6: Remove duplicates - logging.info("=== STEP 6: Removing duplicates ===") - try: - unique_channels = processor.remove_duplicates_optimized(all_channels) - duplicates_removed = len(all_channels) - len(unique_channels) - logging.info(f"βœ… After deduplication: {len(unique_channels)} channels ({duplicates_removed} duplicates removed)") - except Exception as e: - logging.warning(f"Deduplication error: {e}, using original channels") - unique_channels = all_channels - - # Step 7: Sort channels by group and name - logging.info("=== STEP 7: Sorting channels ===") - try: - unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) - logging.info("βœ… Channels sorted by group and name") - except Exception as e: - logging.warning(f"Sorting error: {e}") - - # Step 8: Save updated channels - logging.info("=== STEP 8: Saving updated channels ===") - if save_channels_to_file(unique_channels, 'channels.txt'): - logging.info("βœ… Successfully saved updated channels.txt") - else: - logging.error("❌ Failed to save channels.txt") - - # Step 9: Generate M3U playlist - logging.info("=== STEP 9: Generating M3U playlist ===") - valid_channels, country_stats = generate_m3u_playlist(unique_channels, 'playlist.m3u') - stats['valid_channels'] = valid_channels - stats['country_distribution'] = country_stats - stats['countries_detected'] = len(country_stats) - - # Step 10: Generate summary report - logging.info("=== STEP 10: Generating summary report ===") - - # Show top countries - sorted_countries = sorted(country_stats.items(), key=lambda x: x[1], reverse=True) - logging.info("🌍 Top Countries/Groups:") - for country, count in sorted_countries[:10]: - percentage = (count / valid_channels * 100) if valid_channels > 0 else 0 - logging.info(f" {country}: {count} channels ({percentage:.1f}%)") - - # Final summary - logging.info("πŸŽ‰ PLAYLIST GENERATION COMPLETED SUCCESSFULLY!") - logging.info(f"πŸ“Š Final Statistics:") - logging.info(f" πŸ“Ί Total channels processed: {stats['total_channels']}") - logging.info(f" βœ… Valid channels in playlist: {stats['valid_channels']}") - logging.info(f" πŸ“₯ New channels imported: {stats['imported_channels']}") - logging.info(f" 🌍 Countries/groups detected: {stats['countries_detected']}") - - # Final debug - logging.info("=== FINAL FILE CHECK ===") - debug_file_system() - - return True - - except Exception as e: - logging.error(f"❌ Fatal error in playlist generation: {e}") - import traceback - logging.error(traceback.format_exc()) - return False - -if __name__ == "__main__": - success = generate_playlist() - exit(0 if success else 1) \ No newline at end of file + # Cache negative result too + self._country_cache[cache_key] = "Uncategorized" + self.logger.debug(f"No country detected for: {channel_name} - marked as Uncategorized") + return "Uncategorized" \ No newline at end of file