From c58b851c4a2400606678e5538ae61d75603d0b44 Mon Sep 17 00:00:00 2001 From: stoney420 Date: Fri, 27 Jun 2025 23:29:17 +0200 Subject: [PATCH] Add scripts/channel_processor.py --- scripts/channel_processor.py | 406 +++++++++++++++++++++++++++++++++++ 1 file changed, 406 insertions(+) create mode 100644 scripts/channel_processor.py diff --git a/scripts/channel_processor.py b/scripts/channel_processor.py new file mode 100644 index 0000000..26b7e3a --- /dev/null +++ b/scripts/channel_processor.py @@ -0,0 +1,406 @@ +""" +Channel Processor - Handles channel processing, country detection, and M3U parsing +""" + +import re +import os +import logging +import shutil +from datetime import datetime +from typing import Dict, List, Optional, Set + +class ChannelProcessor: + """High-performance channel processing with optimizations.""" + + def __init__(self, config): + self.config = config + self.logger = logging.getLogger(__name__) + + # Pre-compile regex patterns for performance + self._compile_patterns() + + # Caches for performance + self._country_cache: Dict[str, str] = {} + self._signature_cache: Dict[str, str] = {} + + def _compile_patterns(self): + """Pre-compile regex patterns for better performance.""" + self.url_fix_patterns = [ + (re.compile(r'(https?://[^\s#]+)(#EXTINF)'), r'\1\n\2'), + (re.compile(r'(\.m3u8?)(#EXTINF)'), r'\1\n\2'), + (re.compile(r'([^#\n])#EXTINF'), r'\1\n#EXTINF') + ] + + self.extinf_patterns = { + 'tvg_id': re.compile(r'tvg-id="([^"]*)"'), + 'tvg_logo': re.compile(r'tvg-logo="([^"]*)"'), + 'group_title': re.compile(r'group-title="([^"]*)"'), + 'stream_name': re.compile(r',\s*(.+)$') + } + + def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str: + """Optimized country detection with caching.""" + # Create cache key + cache_key = f"{channel_name}|{epg_id}|{logo_url}" + if cache_key in self._country_cache: + return self._country_cache[cache_key] + + all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}" + + # Check prefixes first (more specific) + for country, prefixes in self.config.patterns["country_prefixes"].items(): + for prefix in prefixes: + if prefix in all_text: + self._country_cache[cache_key] = country + self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')") + return country + + # Check general patterns + for country, keywords in self.config.patterns["country_patterns"].items(): + for keyword in keywords: + if keyword in all_text: + self._country_cache[cache_key] = country + self.logger.debug(f"Detected {country} for: {channel_name} (keyword: '{keyword}')") + return country + + # Cache negative result too + self._country_cache[cache_key] = "Uncategorized" + return "Uncategorized" + + def detect_quality(self, channel_name: str) -> str: + """Detect quality with configurable patterns.""" + name_lower = channel_name.lower() + + for quality, patterns in self.config.patterns["quality_patterns"].items(): + if any(pattern in name_lower for pattern in patterns): + return quality + + return "" + + def is_adult_content(self, channel_name: str) -> bool: + """Check for adult content with configurable keywords.""" + name_lower = channel_name.lower() + return any(keyword in name_lower for keyword in self.config.patterns["adult_keywords"]) + + def validate_channel(self, channel: Dict) -> tuple: + """Enhanced channel validation.""" + name = channel.get('Stream name', '').strip() + url = channel.get('Stream URL', '').strip() + + if not name or not url: + return False, "Missing name or URL" + if len(name) < self.config.settings.get('min_channel_name_length', 2): + return False, "Name too short" + if self.config.settings.get('skip_adult_content', True) and self.is_adult_content(name): + return False, "Adult content filtered" + if not (url.startswith('http') or url.startswith('rtmp')): + return False, "Invalid URL" + + return True, "Valid" + + def apply_auto_detection(self, channel: Dict) -> Dict: + """Apply country detection and quality tags.""" + stream_name = channel.get('Stream name', '') + epg_id = channel.get('EPG id', '') + logo_url = channel.get('Logo', '') + + # Manual overrides first + for key, new_group in self.config.group_overrides.items(): + if key.lower() in stream_name.lower(): + channel['Group'] = new_group + return channel + + # Add quality tag + if self.config.settings.get('detect_quality', True): + quality = self.detect_quality(stream_name) + if quality and quality not in stream_name: + channel['Stream name'] = f"{stream_name} [{quality}]" + + # Auto-detect country + if self.config.settings.get('auto_detect_country', True): + detected_country = self.detect_country_from_channel(stream_name, epg_id, logo_url) + channel['Group'] = detected_country + self.logger.debug(f"Auto-detected: '{stream_name}' → {detected_country}") + + return channel + + def get_channel_signature(self, channel: Dict) -> str: + """Optimized signature generation with caching.""" + name = channel.get('Stream name', '').strip().lower() + url = channel.get('Stream URL', '').strip().lower() + + cache_key = f"{name}|{url}" + if cache_key in self._signature_cache: + return self._signature_cache[cache_key] + + # Clean name + name_clean = re.sub(r'\s+', ' ', name) + name_clean = re.sub(r'[^\w\s]', '', name_clean) + name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip() + + # Clean URL + url_clean = url.split('?')[0] if '?' in url else url + + signature = f"{name_clean}|{url_clean}" + self._signature_cache[cache_key] = signature + return signature + + def remove_duplicates_optimized(self, channels: List[Dict]) -> List[Dict]: + """High-performance duplicate removal using sets.""" + if not self.config.settings.get('remove_duplicates', True): + return channels + + seen_signatures: Set[str] = set() + unique_channels: List[Dict] = [] + duplicates = 0 + + for channel in channels: + signature = self.get_channel_signature(channel) + if signature not in seen_signatures: + seen_signatures.add(signature) + unique_channels.append(channel) + else: + duplicates += 1 + + if duplicates > 0: + self.logger.info(f"Removed {duplicates} duplicate channels") + + return unique_channels + + def parse_channel_block(self, block: str) -> Optional[Dict]: + """Parse channel block from channels.txt.""" + channel_data = {} + lines = block.strip().split('\n') + + for line in lines: + if '=' in line: + key, value = line.split('=', 1) + channel_data[key.strip()] = value.strip() + + return channel_data if channel_data else None + + def parse_m3u_entry(self, extinf_line: str, url_line: str) -> Dict: + """Enhanced M3U entry parsing using pre-compiled patterns.""" + channel = {} + + try: + for field, pattern in self.extinf_patterns.items(): + match = pattern.search(extinf_line) + if field == 'tvg_id': + channel['EPG id'] = match.group(1) if match else '' + elif field == 'tvg_logo': + channel['Logo'] = match.group(1) if match else '' + elif field == 'group_title': + channel['Group'] = match.group(1) if match else 'Uncategorized' + elif field == 'stream_name': + if match: + stream_name = match.group(1).strip() + channel['Stream name'] = re.sub(r'\s+', ' ', stream_name) + else: + channel['Stream name'] = 'Unknown Channel' + + channel['Stream URL'] = url_line.strip() + + except Exception as e: + self.logger.warning(f"Error parsing M3U entry: {e}") + channel = { + 'EPG id': '', 'Logo': '', 'Group': 'Uncategorized', + 'Stream name': 'Parse Error', 'Stream URL': url_line.strip() + } + + return channel + + def convert_to_channels_txt_block(self, channel_data: Dict) -> str: + """Convert to channels.txt format.""" + block = [] + block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") + block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") + block.append(f"Logo = {channel_data.get('Logo', '')}") + block.append(f"EPG id = {channel_data.get('EPG id', '')}") + block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") + return "\n".join(block) + + def clean_corrupted_channels(self): + """Clean up any corrupted entries in existing channels.txt""" + if not os.path.exists(self.config.channels_file): + return + + self.logger.info("Cleaning up any corrupted entries in channels.txt...") + + with open(self.config.channels_file, 'r', encoding='utf-8') as f: + content = f.read() + + channel_blocks = re.split(r'\n\s*\n+', content.strip()) + cleaned_channels = [] + fixed_count = 0 + + for block in channel_blocks: + if block.strip(): + channel = self.parse_channel_block(block) + if channel: + # Clean corrupted Stream URL + stream_url = channel.get('Stream URL', '') + if '#EXTINF' in stream_url or 'group-title=' in stream_url: + if '#EXTINF' in stream_url: + stream_url = stream_url.split('#EXTINF')[0].strip() + if 'group-title=' in stream_url: + stream_url = stream_url.split('group-title=')[0].strip() + channel['Stream URL'] = stream_url + fixed_count += 1 + self.logger.info(f"Fixed corrupted URL for: {channel.get('Stream name')}") + + # Clean corrupted Logo URL + logo_url = channel.get('Logo', '') + if logo_url and ('group-title=' in logo_url or '#EXTINF' in logo_url): + if 'group-title=' in logo_url: + logo_url = logo_url.split('group-title=')[0].strip() + if '#EXTINF' in logo_url: + logo_url = logo_url.split('#EXTINF')[0].strip() + channel['Logo'] = logo_url + fixed_count += 1 + self.logger.info(f"Fixed corrupted logo for: {channel.get('Stream name')}") + + cleaned_channels.append(channel) + + if fixed_count > 0: + self.logger.info(f"Fixed {fixed_count} corrupted entries, rewriting file...") + + # Create backup + self._create_backup(self.config.channels_file) + + with open(self.config.channels_file, 'w', encoding='utf-8') as f: + for i, channel in enumerate(cleaned_channels): + if i > 0: + f.write("\n\n") + f.write(self.convert_to_channels_txt_block(channel)) + + self.logger.info(f"Successfully cleaned and rewrote channels.txt") + else: + self.logger.info("No corrupted entries found to fix") + + def update_existing_channels_with_country_detection(self): + """FIXED: Re-detect countries for existing channels - FORCE UPDATE ALL.""" + if not os.path.exists(self.config.channels_file): + return + + self.logger.info("FORCE re-detecting countries for ALL existing channels...") + + with open(self.config.channels_file, 'r', encoding='utf-8') as f: + content = f.read() + + channel_blocks = re.split(r'\n\s*\n+', content.strip()) + updated_channels = [] + changes = 0 + + for block in channel_blocks: + if block.strip(): + channel = self.parse_channel_block(block) + if channel: + old_group = channel.get('Group', 'Uncategorized') + stream_name = channel.get('Stream name', '') + epg_id = channel.get('EPG id', '') + logo_url = channel.get('Logo', '') + + # FORCE detection for ALL channels + detected = self.detect_country_from_channel(stream_name, epg_id, logo_url) + + # Always update the group + channel['Group'] = detected + if old_group != detected: + changes += 1 + self.logger.info(f"FORCED UPDATE: '{stream_name}' from '{old_group}' to '{detected}'") + + updated_channels.append(channel) + + if updated_channels: + # Create backup and rewrite + self._create_backup(self.config.channels_file) + + with open(self.config.channels_file, 'w', encoding='utf-8') as f: + for i, channel in enumerate(updated_channels): + if i > 0: + f.write("\n\n") + f.write(self.convert_to_channels_txt_block(channel)) + + self.logger.info(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)") + + def process_import(self) -> List[Dict]: + """Enhanced M3U import with robust error handling.""" + if not os.path.exists(self.config.import_file): + self.logger.info("No import file found, skipping import") + return [] + + self.logger.info(f"Processing {self.config.import_file}...") + + imported_channels = [] + + try: + with open(self.config.import_file, 'r', encoding='utf-8') as f: + content = f.read() + + # Pre-process content with optimized regex + for pattern, replacement in self.url_fix_patterns: + content = pattern.sub(replacement, content) + + lines = content.split('\n') + self.logger.info(f"Processing {len(lines)} lines after pre-processing...") + + i = 0 + while i < len(lines): + line = lines[i].strip() + + if line.startswith('#EXTINF:'): + url_line = self._find_url_line(lines, i + 1) + if url_line: + channel = self.parse_m3u_entry(line, url_line) + is_valid, reason = self.validate_channel(channel) + + if is_valid: + channel = self.apply_auto_detection(channel) + imported_channels.append(channel) + else: + self.logger.debug(f"Filtered channel: {channel.get('Stream name')} - {reason}") + + i += 1 + + # Cleanup import file + if self.config.settings.get('auto_cleanup_import', True): + os.remove(self.config.import_file) + self.logger.info("Cleaned up import file") + + self.logger.info(f"Successfully imported {len(imported_channels)} channels") + return imported_channels + + except Exception as e: + self.logger.error(f"Error processing import: {e}") + return [] + + def _find_url_line(self, lines: List[str], start_idx: int) -> Optional[str]: + """Find the URL line following an EXTINF line.""" + for j in range(start_idx, min(len(lines), start_idx + 5)): + potential_url = lines[j].strip() + + if not potential_url or potential_url.startswith('#'): + continue + + # Clean and validate URL + if '#EXTINF' in potential_url: + potential_url = potential_url.split('#EXTINF')[0].strip() + + if (potential_url.startswith(('http://', 'https://', 'rtmp://')) or + potential_url.endswith(('.m3u8', '.ts', '.mp4')) or + '/' in potential_url): + return potential_url + + return None + + def _create_backup(self, file_path: str): + """Create a simple backup.""" + if os.path.exists(file_path): + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_name = f"{file_path}.backup.{timestamp}" + try: + shutil.copy2(file_path, backup_name) + self.logger.info(f"Created backup: {backup_name}") + except Exception as e: + self.logger.warning(f"Could not create backup: {e}") \ No newline at end of file