From df77df1fe94c89d7fb4cbe9610ac53fa790fcd27 Mon Sep 17 00:00:00 2001 From: stoney420 Date: Sat, 28 Jun 2025 23:36:22 +0200 Subject: [PATCH] Update scripts/channel_processor.py --- scripts/channel_processor.py | 530 ++++++++--------------------------- 1 file changed, 114 insertions(+), 416 deletions(-) diff --git a/scripts/channel_processor.py b/scripts/channel_processor.py index 2298893..80948cb 100644 --- a/scripts/channel_processor.py +++ b/scripts/channel_processor.py @@ -1,53 +1,121 @@ -""" -Channel Processor - Handles channel processing, country detection, and M3U parsing -""" - -import re -import os -import logging -import shutil -from datetime import datetime -from typing import Dict, List, Optional, Set - -class ChannelProcessor: - """High-performance channel processing with optimizations.""" - - def __init__(self, config): - self.config = config - self.logger = logging.getLogger(__name__) - - # Pre-compile regex patterns for performance - self._compile_patterns() - - # Caches for performance - self._country_cache: Dict[str, str] = {} - self._signature_cache: Dict[str, str] = {} - - def _compile_patterns(self): - """Pre-compile regex patterns for better performance.""" - self.url_fix_patterns = [ - (re.compile(r'(https?://[^\s#]+)(#EXTINF)'), r'\1\n\2'), - (re.compile(r'(\.m3u8?)(#EXTINF)'), r'\1\n\2'), - (re.compile(r'([^#\n])#EXTINF'), r'\1\n#EXTINF') - ] - - self.extinf_patterns = { - 'tvg_id': re.compile(r'tvg-id="([^"]*)"'), - 'tvg_logo': re.compile(r'tvg-logo="([^"]*)"'), - 'group_title': re.compile(r'group-title="([^"]*)"'), - 'stream_name': re.compile(r',\s*(.+)$') - } - - def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str: - """Optimized country detection with caching.""" +def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str: + """Enhanced country detection with priority rules and platform detection.""" # Create cache key cache_key = f"{channel_name}|{epg_id}|{logo_url}" if cache_key in self._country_cache: return self._country_cache[cache_key] + # Combine all text for analysis all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}" + channel_lower = channel_name.lower() - # Check prefixes first (more specific) + # PRIORITY 1: EPG ID suffix detection (most reliable) + if ".ca" in epg_id.lower(): + result = "πŸ‡¨πŸ‡¦ Canada" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ca)") + return result + elif ".us" in epg_id.lower(): + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .us)") + return result + elif ".uk" in epg_id.lower(): + result = "πŸ‡¬πŸ‡§ United Kingdom" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .uk)") + return result + elif ".ph" in epg_id.lower(): + result = "πŸ‡΅πŸ‡­ Philippines" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ph)") + return result + elif ".au" in epg_id.lower(): + result = "πŸ‡¦πŸ‡Ί Australia" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .au)") + return result + elif ".jp" in epg_id.lower(): + result = "πŸ‡―πŸ‡΅ Japan" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .jp)") + return result + + # PRIORITY 2: Specific channel fixes for misclassified channels + + # Canadian sports channels (TSN series) + if any(x in channel_lower for x in ["tsn 1", "tsn 2", "tsn 3", "tsn 4", "tsn 5", "tsn1", "tsn2", "tsn3", "tsn4", "tsn5"]): + result = "πŸ‡¨πŸ‡¦ Canada" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (TSN Sports)") + return result + + # CBC News Toronto (Canadian) + if "cbc news toronto" in channel_lower: + result = "πŸ‡¨πŸ‡¦ Canada" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (CBC Toronto)") + return result + + # US channels that were misclassified + if any(x in channel_lower for x in ["tv land", "tvland", "we tv", "wetv", "all weddings we tv", "cheaters", "cheers", "christmas 365"]): + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (US Network)") + return result + + # UK shows/channels + if "come dine with me" in channel_lower: + result = "πŸ‡¬πŸ‡§ United Kingdom" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (UK Show)") + return result + + # Philippines news channels + if any(x in channel_lower for x in ["anc global", "anc ph"]): + result = "πŸ‡΅πŸ‡­ Philippines" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Philippines News)") + return result + + # Japan anime channels + if "animax" in channel_lower: + result = "πŸ‡―πŸ‡΅ Japan" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Japanese Anime)") + return result + + # PRIORITY 3: Platform-based detection + + # Pluto TV special handling + if "pluto.tv" in all_text or "images.pluto.tv" in all_text or "jmp2.uk/plu-" in all_text: + # Pluto TV regional overrides + pluto_overrides = { + "cbc news toronto": "πŸ‡¨πŸ‡¦ Canada", + "come dine with me": "πŸ‡¬πŸ‡§ United Kingdom" + } + + for channel_pattern, country in pluto_overrides.items(): + if channel_pattern in channel_lower: + result = country + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Regional)") + return result + + # Default Pluto TV to US + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Default)") + return result + + # Plex TV handling (mostly US) + if "plex.tv" in all_text or "provider-static.plex.tv" in all_text: + result = "πŸ‡ΊπŸ‡Έ United States" + self._country_cache[cache_key] = result + self.logger.debug(f"Detected {result} for: {channel_name} (Plex TV)") + return result + + # PRIORITY 4: Check prefixes (existing logic) for country, prefixes in self.config.patterns["country_prefixes"].items(): for prefix in prefixes: if prefix in all_text: @@ -55,7 +123,7 @@ class ChannelProcessor: self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')") return country - # Check general patterns + # PRIORITY 5: Check general patterns (existing logic) for country, keywords in self.config.patterns["country_patterns"].items(): for keyword in keywords: if keyword in all_text: @@ -65,375 +133,5 @@ class ChannelProcessor: # Cache negative result too self._country_cache[cache_key] = "Uncategorized" - return "Uncategorized" - - def detect_quality(self, channel_name: str) -> str: - """Detect quality with configurable patterns.""" - name_lower = channel_name.lower() - - for quality, patterns in self.config.patterns["quality_patterns"].items(): - if any(pattern in name_lower for pattern in patterns): - return quality - - return "" - - def is_adult_content(self, channel_name: str) -> bool: - """Check for adult content with configurable keywords.""" - name_lower = channel_name.lower() - return any(keyword in name_lower for keyword in self.config.patterns["adult_keywords"]) - - def validate_channel(self, channel: Dict) -> tuple: - """Enhanced channel validation.""" - name = channel.get('Stream name', '').strip() - url = channel.get('Stream URL', '').strip() - - if not name or not url: - return False, "Missing name or URL" - if len(name) < self.config.settings.get('min_channel_name_length', 2): - return False, "Name too short" - if self.config.settings.get('skip_adult_content', True) and self.is_adult_content(name): - return False, "Adult content filtered" - if not (url.startswith('http') or url.startswith('rtmp')): - return False, "Invalid URL" - - return True, "Valid" - - def apply_auto_detection(self, channel: Dict) -> Dict: - """Apply country detection and quality tags.""" - stream_name = channel.get('Stream name', '') - epg_id = channel.get('EPG id', '') - logo_url = channel.get('Logo', '') - - # Manual overrides first - for key, new_group in self.config.group_overrides.items(): - if key.lower() in stream_name.lower(): - channel['Group'] = new_group - return channel - - # Add quality tag - if self.config.settings.get('detect_quality', True): - quality = self.detect_quality(stream_name) - if quality and quality not in stream_name: - channel['Stream name'] = f"{stream_name} [{quality}]" - - # Auto-detect country - if self.config.settings.get('auto_detect_country', True): - detected_country = self.detect_country_from_channel(stream_name, epg_id, logo_url) - channel['Group'] = detected_country - self.logger.debug(f"Auto-detected: '{stream_name}' β†’ {detected_country}") - - return channel - - def get_channel_signature(self, channel: Dict) -> str: - """Optimized signature generation with caching.""" - name = channel.get('Stream name', '').strip().lower() - url = channel.get('Stream URL', '').strip().lower() - - cache_key = f"{name}|{url}" - if cache_key in self._signature_cache: - return self._signature_cache[cache_key] - - # Clean name - name_clean = re.sub(r'\s+', ' ', name) - name_clean = re.sub(r'[^\w\s]', '', name_clean) - name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip() - - # Clean URL - url_clean = url.split('?')[0] if '?' in url else url - - signature = f"{name_clean}|{url_clean}" - self._signature_cache[cache_key] = signature - return signature - - def remove_duplicates_optimized(self, channels: List[Dict]) -> List[Dict]: - """High-performance duplicate removal using sets.""" - if not self.config.settings.get('remove_duplicates', True): - return channels - - seen_signatures: Set[str] = set() - unique_channels: List[Dict] = [] - duplicates = 0 - - for channel in channels: - signature = self.get_channel_signature(channel) - if signature not in seen_signatures: - seen_signatures.add(signature) - unique_channels.append(channel) - else: - duplicates += 1 - - if duplicates > 0: - self.logger.info(f"Removed {duplicates} duplicate channels") - - return unique_channels - - def parse_channel_block(self, block: str) -> Optional[Dict]: - """Parse channel block from channels.txt.""" - channel_data = {} - lines = block.strip().split('\n') - - for line in lines: - if '=' in line: - key, value = line.split('=', 1) - channel_data[key.strip()] = value.strip() - - return channel_data if channel_data else None - - def parse_m3u_entry(self, extinf_line: str, url_line: str) -> Dict: - """Enhanced M3U entry parsing using pre-compiled patterns.""" - channel = {} - - try: - for field, pattern in self.extinf_patterns.items(): - match = pattern.search(extinf_line) - if field == 'tvg_id': - channel['EPG id'] = match.group(1) if match else '' - elif field == 'tvg_logo': - channel['Logo'] = match.group(1) if match else '' - elif field == 'group_title': - channel['Group'] = match.group(1) if match else 'Uncategorized' - elif field == 'stream_name': - if match: - stream_name = match.group(1).strip() - channel['Stream name'] = re.sub(r'\s+', ' ', stream_name) - else: - channel['Stream name'] = 'Unknown Channel' - - channel['Stream URL'] = url_line.strip() - - except Exception as e: - self.logger.warning(f"Error parsing M3U entry: {e}") - channel = { - 'EPG id': '', 'Logo': '', 'Group': 'Uncategorized', - 'Stream name': 'Parse Error', 'Stream URL': url_line.strip() - } - - return channel - - def convert_to_channels_txt_block(self, channel_data: Dict) -> str: - """Convert to channels.txt format.""" - block = [] - block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") - block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") - block.append(f"Logo = {channel_data.get('Logo', '')}") - block.append(f"EPG id = {channel_data.get('EPG id', '')}") - block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") - return "\n".join(block) - - def clean_corrupted_channels(self): - """Clean up any corrupted entries in existing channels.txt""" - if not os.path.exists(self.config.channels_file): - return - - self.logger.info("Cleaning up any corrupted entries in channels.txt...") - - with open(self.config.channels_file, 'r', encoding='utf-8') as f: - content = f.read() - - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - cleaned_channels = [] - fixed_count = 0 - - for block in channel_blocks: - if block.strip(): - channel = self.parse_channel_block(block) - if channel: - # Clean corrupted Stream URL - stream_url = channel.get('Stream URL', '') - if '#EXTINF' in stream_url or 'group-title=' in stream_url: - if '#EXTINF' in stream_url: - stream_url = stream_url.split('#EXTINF')[0].strip() - if 'group-title=' in stream_url: - stream_url = stream_url.split('group-title=')[0].strip() - channel['Stream URL'] = stream_url - fixed_count += 1 - self.logger.info(f"Fixed corrupted URL for: {channel.get('Stream name')}") - - # Clean corrupted Logo URL - logo_url = channel.get('Logo', '') - if logo_url and ('group-title=' in logo_url or '#EXTINF' in logo_url): - if 'group-title=' in logo_url: - logo_url = logo_url.split('group-title=')[0].strip() - if '#EXTINF' in logo_url: - logo_url = logo_url.split('#EXTINF')[0].strip() - channel['Logo'] = logo_url - fixed_count += 1 - self.logger.info(f"Fixed corrupted logo for: {channel.get('Stream name')}") - - cleaned_channels.append(channel) - - if fixed_count > 0: - self.logger.info(f"Fixed {fixed_count} corrupted entries, rewriting file...") - - # Create backup - self._create_backup(self.config.channels_file) - - with open(self.config.channels_file, 'w', encoding='utf-8') as f: - for i, channel in enumerate(cleaned_channels): - if i > 0: - f.write("\n\n") - f.write(self.convert_to_channels_txt_block(channel)) - - self.logger.info(f"Successfully cleaned and rewrote channels.txt") - else: - self.logger.info("No corrupted entries found to fix") - - def update_existing_channels_with_country_detection(self): - """FIXED: Re-detect countries for existing channels - FORCE UPDATE ALL.""" - if not os.path.exists(self.config.channels_file): - return - - self.logger.info("FORCE re-detecting countries for ALL existing channels...") - - with open(self.config.channels_file, 'r', encoding='utf-8') as f: - content = f.read() - - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - updated_channels = [] - changes = 0 - - for block in channel_blocks: - if block.strip(): - channel = self.parse_channel_block(block) - if channel: - old_group = channel.get('Group', 'Uncategorized') - stream_name = channel.get('Stream name', '') - epg_id = channel.get('EPG id', '') - logo_url = channel.get('Logo', '') - - # FORCE detection for ALL channels - detected = self.detect_country_from_channel(stream_name, epg_id, logo_url) - - # Always update the group - channel['Group'] = detected - if old_group != detected: - changes += 1 - self.logger.info(f"FORCED UPDATE: '{stream_name}' from '{old_group}' to '{detected}'") - - updated_channels.append(channel) - - if updated_channels: - # Create backup and rewrite - self._create_backup(self.config.channels_file) - - with open(self.config.channels_file, 'w', encoding='utf-8') as f: - for i, channel in enumerate(updated_channels): - if i > 0: - f.write("\n\n") - f.write(self.convert_to_channels_txt_block(channel)) - - self.logger.info(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)") - - def process_import(self) -> List[Dict]: - """Enhanced M3U import with robust error handling.""" - if not os.path.exists(self.config.import_file): - self.logger.info("No import file found, skipping import") - return [] - - self.logger.info(f"Processing {self.config.import_file}...") - - imported_channels = [] - - try: - with open(self.config.import_file, 'r', encoding='utf-8') as f: - content = f.read() - - # Pre-process content with optimized regex - for pattern, replacement in self.url_fix_patterns: - content = pattern.sub(replacement, content) - - lines = content.split('\n') - self.logger.info(f"Processing {len(lines)} lines after pre-processing...") - - i = 0 - while i < len(lines): - line = lines[i].strip() - - if line.startswith('#EXTINF:'): - url_line = self._find_url_line(lines, i + 1) - if url_line: - channel = self.parse_m3u_entry(line, url_line) - is_valid, reason = self.validate_channel(channel) - - if is_valid: - channel = self.apply_auto_detection(channel) - imported_channels.append(channel) - else: - self.logger.debug(f"Filtered channel: {channel.get('Stream name')} - {reason}") - - i += 1 - - # Cleanup import file - if self.config.settings.get('auto_cleanup_import', True): - os.remove(self.config.import_file) - self.logger.info("Cleaned up import file") - - # Cleanup import file - CLEAR contents instead of deleting the file - if self.config.settings.get('clear_import_after_processing', True): - try: - # Clear the file contents by writing just the M3U header - with open(self.config.import_file, 'w', encoding='utf-8') as f: - f.write('#EXTM3U\n') # Keep M3U header but remove all channels - self.logger.info(f"βœ… Cleared contents of {self.config.import_file} (file preserved for future imports)") - except Exception as e: - self.logger.warning(f"Could not clear import file contents: {e}") - elif self.config.settings.get('delete_import_file', False): - try: - os.remove(self.config.import_file) - self.logger.info(f"Deleted import file: {self.config.import_file}") - except Exception as e: - self.logger.warning(f"Could not delete import file: {e}") - else: - self.logger.info(f"Import file left unchanged: {self.config.import_file}") - - # CRITICAL: Save the imported channels to channels.txt - if imported_channels: - self.logger.info(f"Saving {len(imported_channels)} imported channels to file...") - - # We need to import FileManager here to avoid circular imports - from file_manager import FileManager - file_manager = FileManager(self.config) - - # Append the new channels to the file - success = file_manager.append_channels(imported_channels) - if success: - self.logger.info(f"βœ… Successfully saved {len(imported_channels)} channels to {self.config.channels_file}") - else: - self.logger.error(f"❌ Failed to save imported channels to file") - - self.logger.info(f"Successfully imported {len(imported_channels)} channels") - return imported_channels - - except Exception as e: - self.logger.error(f"Error processing import: {e}") - return [] - - def _find_url_line(self, lines: List[str], start_idx: int) -> Optional[str]: - """Find the URL line following an EXTINF line.""" - for j in range(start_idx, min(len(lines), start_idx + 5)): - potential_url = lines[j].strip() - - if not potential_url or potential_url.startswith('#'): - continue - - # Clean and validate URL - if '#EXTINF' in potential_url: - potential_url = potential_url.split('#EXTINF')[0].strip() - - if (potential_url.startswith(('http://', 'https://', 'rtmp://')) or - potential_url.endswith(('.m3u8', '.ts', '.mp4')) or - '/' in potential_url): - return potential_url - - return None - - def _create_backup(self, file_path: str): - """Create a simple backup.""" - if os.path.exists(file_path): - timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - backup_name = f"{file_path}.backup.{timestamp}" - try: - shutil.copy2(file_path, backup_name) - self.logger.info(f"Created backup: {backup_name}") - except Exception as e: - self.logger.warning(f"Could not create backup: {e}") \ No newline at end of file + self.logger.debug(f"No country detected for: {channel_name} - marked as Uncategorized") + return "Uncategorized" \ No newline at end of file