""" Channel Processor - Handles channel processing, country detection, and M3U parsing """ import re import os import logging import shutil from datetime import datetime from typing import Dict, List, Optional, Set class ChannelProcessor: """High-performance channel processing with optimizations.""" def __init__(self, config): self.config = config self.logger = logging.getLogger(__name__) # Pre-compile regex patterns for performance self._compile_patterns() # Caches for performance self._country_cache: Dict[str, str] = {} self._signature_cache: Dict[str, str] = {} def _compile_patterns(self): """Pre-compile regex patterns for better performance.""" self.url_fix_patterns = [ (re.compile(r'(https?://[^\s#]+)(#EXTINF)'), r'\1\n\2'), (re.compile(r'(\.m3u8?)(#EXTINF)'), r'\1\n\2'), (re.compile(r'([^#\n])#EXTINF'), r'\1\n#EXTINF') ] self.extinf_patterns = { 'tvg_id': re.compile(r'tvg-id="([^"]*)"'), 'tvg_logo': re.compile(r'tvg-logo="([^"]*)"'), 'group_title': re.compile(r'group-title="([^"]*)"'), 'stream_name': re.compile(r',\s*(.+)$') } def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str: """Optimized country detection with caching.""" # Create cache key cache_key = f"{channel_name}|{epg_id}|{logo_url}" if cache_key in self._country_cache: return self._country_cache[cache_key] all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}" # Check prefixes first (more specific) for country, prefixes in self.config.patterns["country_prefixes"].items(): for prefix in prefixes: if prefix in all_text: self._country_cache[cache_key] = country self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')") return country # Check general patterns for country, keywords in self.config.patterns["country_patterns"].items(): for keyword in keywords: if keyword in all_text: self._country_cache[cache_key] = country self.logger.debug(f"Detected {country} for: {channel_name} (keyword: '{keyword}')") return country # Cache negative result too self._country_cache[cache_key] = "Uncategorized" return "Uncategorized" def detect_quality(self, channel_name: str) -> str: """Detect quality with configurable patterns.""" name_lower = channel_name.lower() for quality, patterns in self.config.patterns["quality_patterns"].items(): if any(pattern in name_lower for pattern in patterns): return quality return "" def is_adult_content(self, channel_name: str) -> bool: """Check for adult content with configurable keywords.""" name_lower = channel_name.lower() return any(keyword in name_lower for keyword in self.config.patterns["adult_keywords"]) def validate_channel(self, channel: Dict) -> tuple: """Enhanced channel validation.""" name = channel.get('Stream name', '').strip() url = channel.get('Stream URL', '').strip() if not name or not url: return False, "Missing name or URL" if len(name) < self.config.settings.get('min_channel_name_length', 2): return False, "Name too short" if self.config.settings.get('skip_adult_content', True) and self.is_adult_content(name): return False, "Adult content filtered" if not (url.startswith('http') or url.startswith('rtmp')): return False, "Invalid URL" return True, "Valid" def apply_auto_detection(self, channel: Dict) -> Dict: """Apply country detection and quality tags.""" stream_name = channel.get('Stream name', '') epg_id = channel.get('EPG id', '') logo_url = channel.get('Logo', '') # Manual overrides first for key, new_group in self.config.group_overrides.items(): if key.lower() in stream_name.lower(): channel['Group'] = new_group return channel # Add quality tag if self.config.settings.get('detect_quality', True): quality = self.detect_quality(stream_name) if quality and quality not in stream_name: channel['Stream name'] = f"{stream_name} [{quality}]" # Auto-detect country if self.config.settings.get('auto_detect_country', True): detected_country = self.detect_country_from_channel(stream_name, epg_id, logo_url) channel['Group'] = detected_country self.logger.debug(f"Auto-detected: '{stream_name}' → {detected_country}") return channel def get_channel_signature(self, channel: Dict) -> str: """Optimized signature generation with caching.""" name = channel.get('Stream name', '').strip().lower() url = channel.get('Stream URL', '').strip().lower() cache_key = f"{name}|{url}" if cache_key in self._signature_cache: return self._signature_cache[cache_key] # Clean name name_clean = re.sub(r'\s+', ' ', name) name_clean = re.sub(r'[^\w\s]', '', name_clean) name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip() # Clean URL url_clean = url.split('?')[0] if '?' in url else url signature = f"{name_clean}|{url_clean}" self._signature_cache[cache_key] = signature return signature def remove_duplicates_optimized(self, channels: List[Dict]) -> List[Dict]: """High-performance duplicate removal using sets.""" if not self.config.settings.get('remove_duplicates', True): return channels seen_signatures: Set[str] = set() unique_channels: List[Dict] = [] duplicates = 0 for channel in channels: signature = self.get_channel_signature(channel) if signature not in seen_signatures: seen_signatures.add(signature) unique_channels.append(channel) else: duplicates += 1 if duplicates > 0: self.logger.info(f"Removed {duplicates} duplicate channels") return unique_channels def parse_channel_block(self, block: str) -> Optional[Dict]: """Parse channel block from channels.txt.""" channel_data = {} lines = block.strip().split('\n') for line in lines: if '=' in line: key, value = line.split('=', 1) channel_data[key.strip()] = value.strip() return channel_data if channel_data else None def parse_m3u_entry(self, extinf_line: str, url_line: str) -> Dict: """Enhanced M3U entry parsing using pre-compiled patterns.""" channel = {} try: for field, pattern in self.extinf_patterns.items(): match = pattern.search(extinf_line) if field == 'tvg_id': channel['EPG id'] = match.group(1) if match else '' elif field == 'tvg_logo': channel['Logo'] = match.group(1) if match else '' elif field == 'group_title': channel['Group'] = match.group(1) if match else 'Uncategorized' elif field == 'stream_name': if match: stream_name = match.group(1).strip() channel['Stream name'] = re.sub(r'\s+', ' ', stream_name) else: channel['Stream name'] = 'Unknown Channel' channel['Stream URL'] = url_line.strip() except Exception as e: self.logger.warning(f"Error parsing M3U entry: {e}") channel = { 'EPG id': '', 'Logo': '', 'Group': 'Uncategorized', 'Stream name': 'Parse Error', 'Stream URL': url_line.strip() } return channel def convert_to_channels_txt_block(self, channel_data: Dict) -> str: """Convert to channels.txt format.""" block = [] block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") block.append(f"Logo = {channel_data.get('Logo', '')}") block.append(f"EPG id = {channel_data.get('EPG id', '')}") block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") return "\n".join(block) def clean_corrupted_channels(self): """Clean up any corrupted entries in existing channels.txt""" if not os.path.exists(self.config.channels_file): return self.logger.info("Cleaning up any corrupted entries in channels.txt...") with open(self.config.channels_file, 'r', encoding='utf-8') as f: content = f.read() channel_blocks = re.split(r'\n\s*\n+', content.strip()) cleaned_channels = [] fixed_count = 0 for block in channel_blocks: if block.strip(): channel = self.parse_channel_block(block) if channel: # Clean corrupted Stream URL stream_url = channel.get('Stream URL', '') if '#EXTINF' in stream_url or 'group-title=' in stream_url: if '#EXTINF' in stream_url: stream_url = stream_url.split('#EXTINF')[0].strip() if 'group-title=' in stream_url: stream_url = stream_url.split('group-title=')[0].strip() channel['Stream URL'] = stream_url fixed_count += 1 self.logger.info(f"Fixed corrupted URL for: {channel.get('Stream name')}") # Clean corrupted Logo URL logo_url = channel.get('Logo', '') if logo_url and ('group-title=' in logo_url or '#EXTINF' in logo_url): if 'group-title=' in logo_url: logo_url = logo_url.split('group-title=')[0].strip() if '#EXTINF' in logo_url: logo_url = logo_url.split('#EXTINF')[0].strip() channel['Logo'] = logo_url fixed_count += 1 self.logger.info(f"Fixed corrupted logo for: {channel.get('Stream name')}") cleaned_channels.append(channel) if fixed_count > 0: self.logger.info(f"Fixed {fixed_count} corrupted entries, rewriting file...") # Create backup self._create_backup(self.config.channels_file) with open(self.config.channels_file, 'w', encoding='utf-8') as f: for i, channel in enumerate(cleaned_channels): if i > 0: f.write("\n\n") f.write(self.convert_to_channels_txt_block(channel)) self.logger.info(f"Successfully cleaned and rewrote channels.txt") else: self.logger.info("No corrupted entries found to fix") def update_existing_channels_with_country_detection(self): """FIXED: Re-detect countries for existing channels - FORCE UPDATE ALL.""" if not os.path.exists(self.config.channels_file): return self.logger.info("FORCE re-detecting countries for ALL existing channels...") with open(self.config.channels_file, 'r', encoding='utf-8') as f: content = f.read() channel_blocks = re.split(r'\n\s*\n+', content.strip()) updated_channels = [] changes = 0 for block in channel_blocks: if block.strip(): channel = self.parse_channel_block(block) if channel: old_group = channel.get('Group', 'Uncategorized') stream_name = channel.get('Stream name', '') epg_id = channel.get('EPG id', '') logo_url = channel.get('Logo', '') # FORCE detection for ALL channels detected = self.detect_country_from_channel(stream_name, epg_id, logo_url) # Always update the group channel['Group'] = detected if old_group != detected: changes += 1 self.logger.info(f"FORCED UPDATE: '{stream_name}' from '{old_group}' to '{detected}'") updated_channels.append(channel) if updated_channels: # Create backup and rewrite self._create_backup(self.config.channels_file) with open(self.config.channels_file, 'w', encoding='utf-8') as f: for i, channel in enumerate(updated_channels): if i > 0: f.write("\n\n") f.write(self.convert_to_channels_txt_block(channel)) self.logger.info(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)") def process_import(self) -> List[Dict]: """Enhanced M3U import with robust error handling.""" if not os.path.exists(self.config.import_file): self.logger.info("No import file found, skipping import") return [] self.logger.info(f"Processing {self.config.import_file}...") imported_channels = [] try: with open(self.config.import_file, 'r', encoding='utf-8') as f: content = f.read() # Pre-process content with optimized regex for pattern, replacement in self.url_fix_patterns: content = pattern.sub(replacement, content) lines = content.split('\n') self.logger.info(f"Processing {len(lines)} lines after pre-processing...") i = 0 while i < len(lines): line = lines[i].strip() if line.startswith('#EXTINF:'): url_line = self._find_url_line(lines, i + 1) if url_line: channel = self.parse_m3u_entry(line, url_line) is_valid, reason = self.validate_channel(channel) if is_valid: channel = self.apply_auto_detection(channel) imported_channels.append(channel) else: self.logger.debug(f"Filtered channel: {channel.get('Stream name')} - {reason}") i += 1 # Cleanup import file if self.config.settings.get('auto_cleanup_import', True): os.remove(self.config.import_file) self.logger.info("Cleaned up import file") self.logger.info(f"Successfully imported {len(imported_channels)} channels") return imported_channels except Exception as e: self.logger.error(f"Error processing import: {e}") return [] def _find_url_line(self, lines: List[str], start_idx: int) -> Optional[str]: """Find the URL line following an EXTINF line.""" for j in range(start_idx, min(len(lines), start_idx + 5)): potential_url = lines[j].strip() if not potential_url or potential_url.startswith('#'): continue # Clean and validate URL if '#EXTINF' in potential_url: potential_url = potential_url.split('#EXTINF')[0].strip() if (potential_url.startswith(('http://', 'https://', 'rtmp://')) or potential_url.endswith(('.m3u8', '.ts', '.mp4')) or '/' in potential_url): return potential_url return None def _create_backup(self, file_path: str): """Create a simple backup.""" if os.path.exists(file_path): timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') backup_name = f"{file_path}.backup.{timestamp}" try: shutil.copy2(file_path, backup_name) self.logger.info(f"Created backup: {backup_name}") except Exception as e: self.logger.warning(f"Could not create backup: {e}")