Update scripts/channel_processor.py
Some checks failed
Generate M3U Playlist with Auto-Organization / build-and-organize (push) Has been cancelled

This commit is contained in:
stoney420 2025-06-28 23:36:22 +02:00
parent 465d8a41d5
commit df77df1fe9

View file

@ -1,53 +1,121 @@
""" def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str:
Channel Processor - Handles channel processing, country detection, and M3U parsing """Enhanced country detection with priority rules and platform detection."""
"""
import re
import os
import logging
import shutil
from datetime import datetime
from typing import Dict, List, Optional, Set
class ChannelProcessor:
"""High-performance channel processing with optimizations."""
def __init__(self, config):
self.config = config
self.logger = logging.getLogger(__name__)
# Pre-compile regex patterns for performance
self._compile_patterns()
# Caches for performance
self._country_cache: Dict[str, str] = {}
self._signature_cache: Dict[str, str] = {}
def _compile_patterns(self):
"""Pre-compile regex patterns for better performance."""
self.url_fix_patterns = [
(re.compile(r'(https?://[^\s#]+)(#EXTINF)'), r'\1\n\2'),
(re.compile(r'(\.m3u8?)(#EXTINF)'), r'\1\n\2'),
(re.compile(r'([^#\n])#EXTINF'), r'\1\n#EXTINF')
]
self.extinf_patterns = {
'tvg_id': re.compile(r'tvg-id="([^"]*)"'),
'tvg_logo': re.compile(r'tvg-logo="([^"]*)"'),
'group_title': re.compile(r'group-title="([^"]*)"'),
'stream_name': re.compile(r',\s*(.+)$')
}
def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str:
"""Optimized country detection with caching."""
# Create cache key # Create cache key
cache_key = f"{channel_name}|{epg_id}|{logo_url}" cache_key = f"{channel_name}|{epg_id}|{logo_url}"
if cache_key in self._country_cache: if cache_key in self._country_cache:
return self._country_cache[cache_key] return self._country_cache[cache_key]
# Combine all text for analysis
all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}" all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}"
channel_lower = channel_name.lower()
# Check prefixes first (more specific) # PRIORITY 1: EPG ID suffix detection (most reliable)
if ".ca" in epg_id.lower():
result = "🇨🇦 Canada"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ca)")
return result
elif ".us" in epg_id.lower():
result = "🇺🇸 United States"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .us)")
return result
elif ".uk" in epg_id.lower():
result = "🇬🇧 United Kingdom"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .uk)")
return result
elif ".ph" in epg_id.lower():
result = "🇵🇭 Philippines"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .ph)")
return result
elif ".au" in epg_id.lower():
result = "🇦🇺 Australia"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .au)")
return result
elif ".jp" in epg_id.lower():
result = "🇯🇵 Japan"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (EPG: .jp)")
return result
# PRIORITY 2: Specific channel fixes for misclassified channels
# Canadian sports channels (TSN series)
if any(x in channel_lower for x in ["tsn 1", "tsn 2", "tsn 3", "tsn 4", "tsn 5", "tsn1", "tsn2", "tsn3", "tsn4", "tsn5"]):
result = "🇨🇦 Canada"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (TSN Sports)")
return result
# CBC News Toronto (Canadian)
if "cbc news toronto" in channel_lower:
result = "🇨🇦 Canada"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (CBC Toronto)")
return result
# US channels that were misclassified
if any(x in channel_lower for x in ["tv land", "tvland", "we tv", "wetv", "all weddings we tv", "cheaters", "cheers", "christmas 365"]):
result = "🇺🇸 United States"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (US Network)")
return result
# UK shows/channels
if "come dine with me" in channel_lower:
result = "🇬🇧 United Kingdom"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (UK Show)")
return result
# Philippines news channels
if any(x in channel_lower for x in ["anc global", "anc ph"]):
result = "🇵🇭 Philippines"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (Philippines News)")
return result
# Japan anime channels
if "animax" in channel_lower:
result = "🇯🇵 Japan"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (Japanese Anime)")
return result
# PRIORITY 3: Platform-based detection
# Pluto TV special handling
if "pluto.tv" in all_text or "images.pluto.tv" in all_text or "jmp2.uk/plu-" in all_text:
# Pluto TV regional overrides
pluto_overrides = {
"cbc news toronto": "🇨🇦 Canada",
"come dine with me": "🇬🇧 United Kingdom"
}
for channel_pattern, country in pluto_overrides.items():
if channel_pattern in channel_lower:
result = country
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Regional)")
return result
# Default Pluto TV to US
result = "🇺🇸 United States"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (Pluto TV Default)")
return result
# Plex TV handling (mostly US)
if "plex.tv" in all_text or "provider-static.plex.tv" in all_text:
result = "🇺🇸 United States"
self._country_cache[cache_key] = result
self.logger.debug(f"Detected {result} for: {channel_name} (Plex TV)")
return result
# PRIORITY 4: Check prefixes (existing logic)
for country, prefixes in self.config.patterns["country_prefixes"].items(): for country, prefixes in self.config.patterns["country_prefixes"].items():
for prefix in prefixes: for prefix in prefixes:
if prefix in all_text: if prefix in all_text:
@ -55,7 +123,7 @@ class ChannelProcessor:
self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')") self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')")
return country return country
# Check general patterns # PRIORITY 5: Check general patterns (existing logic)
for country, keywords in self.config.patterns["country_patterns"].items(): for country, keywords in self.config.patterns["country_patterns"].items():
for keyword in keywords: for keyword in keywords:
if keyword in all_text: if keyword in all_text:
@ -65,375 +133,5 @@ class ChannelProcessor:
# Cache negative result too # Cache negative result too
self._country_cache[cache_key] = "Uncategorized" self._country_cache[cache_key] = "Uncategorized"
return "Uncategorized" self.logger.debug(f"No country detected for: {channel_name} - marked as Uncategorized")
return "Uncategorized"
def detect_quality(self, channel_name: str) -> str:
"""Detect quality with configurable patterns."""
name_lower = channel_name.lower()
for quality, patterns in self.config.patterns["quality_patterns"].items():
if any(pattern in name_lower for pattern in patterns):
return quality
return ""
def is_adult_content(self, channel_name: str) -> bool:
"""Check for adult content with configurable keywords."""
name_lower = channel_name.lower()
return any(keyword in name_lower for keyword in self.config.patterns["adult_keywords"])
def validate_channel(self, channel: Dict) -> tuple:
"""Enhanced channel validation."""
name = channel.get('Stream name', '').strip()
url = channel.get('Stream URL', '').strip()
if not name or not url:
return False, "Missing name or URL"
if len(name) < self.config.settings.get('min_channel_name_length', 2):
return False, "Name too short"
if self.config.settings.get('skip_adult_content', True) and self.is_adult_content(name):
return False, "Adult content filtered"
if not (url.startswith('http') or url.startswith('rtmp')):
return False, "Invalid URL"
return True, "Valid"
def apply_auto_detection(self, channel: Dict) -> Dict:
"""Apply country detection and quality tags."""
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo_url = channel.get('Logo', '')
# Manual overrides first
for key, new_group in self.config.group_overrides.items():
if key.lower() in stream_name.lower():
channel['Group'] = new_group
return channel
# Add quality tag
if self.config.settings.get('detect_quality', True):
quality = self.detect_quality(stream_name)
if quality and quality not in stream_name:
channel['Stream name'] = f"{stream_name} [{quality}]"
# Auto-detect country
if self.config.settings.get('auto_detect_country', True):
detected_country = self.detect_country_from_channel(stream_name, epg_id, logo_url)
channel['Group'] = detected_country
self.logger.debug(f"Auto-detected: '{stream_name}'{detected_country}")
return channel
def get_channel_signature(self, channel: Dict) -> str:
"""Optimized signature generation with caching."""
name = channel.get('Stream name', '').strip().lower()
url = channel.get('Stream URL', '').strip().lower()
cache_key = f"{name}|{url}"
if cache_key in self._signature_cache:
return self._signature_cache[cache_key]
# Clean name
name_clean = re.sub(r'\s+', ' ', name)
name_clean = re.sub(r'[^\w\s]', '', name_clean)
name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip()
# Clean URL
url_clean = url.split('?')[0] if '?' in url else url
signature = f"{name_clean}|{url_clean}"
self._signature_cache[cache_key] = signature
return signature
def remove_duplicates_optimized(self, channels: List[Dict]) -> List[Dict]:
"""High-performance duplicate removal using sets."""
if not self.config.settings.get('remove_duplicates', True):
return channels
seen_signatures: Set[str] = set()
unique_channels: List[Dict] = []
duplicates = 0
for channel in channels:
signature = self.get_channel_signature(channel)
if signature not in seen_signatures:
seen_signatures.add(signature)
unique_channels.append(channel)
else:
duplicates += 1
if duplicates > 0:
self.logger.info(f"Removed {duplicates} duplicate channels")
return unique_channels
def parse_channel_block(self, block: str) -> Optional[Dict]:
"""Parse channel block from channels.txt."""
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
channel_data[key.strip()] = value.strip()
return channel_data if channel_data else None
def parse_m3u_entry(self, extinf_line: str, url_line: str) -> Dict:
"""Enhanced M3U entry parsing using pre-compiled patterns."""
channel = {}
try:
for field, pattern in self.extinf_patterns.items():
match = pattern.search(extinf_line)
if field == 'tvg_id':
channel['EPG id'] = match.group(1) if match else ''
elif field == 'tvg_logo':
channel['Logo'] = match.group(1) if match else ''
elif field == 'group_title':
channel['Group'] = match.group(1) if match else 'Uncategorized'
elif field == 'stream_name':
if match:
stream_name = match.group(1).strip()
channel['Stream name'] = re.sub(r'\s+', ' ', stream_name)
else:
channel['Stream name'] = 'Unknown Channel'
channel['Stream URL'] = url_line.strip()
except Exception as e:
self.logger.warning(f"Error parsing M3U entry: {e}")
channel = {
'EPG id': '', 'Logo': '', 'Group': 'Uncategorized',
'Stream name': 'Parse Error', 'Stream URL': url_line.strip()
}
return channel
def convert_to_channels_txt_block(self, channel_data: Dict) -> str:
"""Convert to channels.txt format."""
block = []
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
block.append(f"Logo = {channel_data.get('Logo', '')}")
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
return "\n".join(block)
def clean_corrupted_channels(self):
"""Clean up any corrupted entries in existing channels.txt"""
if not os.path.exists(self.config.channels_file):
return
self.logger.info("Cleaning up any corrupted entries in channels.txt...")
with open(self.config.channels_file, 'r', encoding='utf-8') as f:
content = f.read()
channel_blocks = re.split(r'\n\s*\n+', content.strip())
cleaned_channels = []
fixed_count = 0
for block in channel_blocks:
if block.strip():
channel = self.parse_channel_block(block)
if channel:
# Clean corrupted Stream URL
stream_url = channel.get('Stream URL', '')
if '#EXTINF' in stream_url or 'group-title=' in stream_url:
if '#EXTINF' in stream_url:
stream_url = stream_url.split('#EXTINF')[0].strip()
if 'group-title=' in stream_url:
stream_url = stream_url.split('group-title=')[0].strip()
channel['Stream URL'] = stream_url
fixed_count += 1
self.logger.info(f"Fixed corrupted URL for: {channel.get('Stream name')}")
# Clean corrupted Logo URL
logo_url = channel.get('Logo', '')
if logo_url and ('group-title=' in logo_url or '#EXTINF' in logo_url):
if 'group-title=' in logo_url:
logo_url = logo_url.split('group-title=')[0].strip()
if '#EXTINF' in logo_url:
logo_url = logo_url.split('#EXTINF')[0].strip()
channel['Logo'] = logo_url
fixed_count += 1
self.logger.info(f"Fixed corrupted logo for: {channel.get('Stream name')}")
cleaned_channels.append(channel)
if fixed_count > 0:
self.logger.info(f"Fixed {fixed_count} corrupted entries, rewriting file...")
# Create backup
self._create_backup(self.config.channels_file)
with open(self.config.channels_file, 'w', encoding='utf-8') as f:
for i, channel in enumerate(cleaned_channels):
if i > 0:
f.write("\n\n")
f.write(self.convert_to_channels_txt_block(channel))
self.logger.info(f"Successfully cleaned and rewrote channels.txt")
else:
self.logger.info("No corrupted entries found to fix")
def update_existing_channels_with_country_detection(self):
"""FIXED: Re-detect countries for existing channels - FORCE UPDATE ALL."""
if not os.path.exists(self.config.channels_file):
return
self.logger.info("FORCE re-detecting countries for ALL existing channels...")
with open(self.config.channels_file, 'r', encoding='utf-8') as f:
content = f.read()
channel_blocks = re.split(r'\n\s*\n+', content.strip())
updated_channels = []
changes = 0
for block in channel_blocks:
if block.strip():
channel = self.parse_channel_block(block)
if channel:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo_url = channel.get('Logo', '')
# FORCE detection for ALL channels
detected = self.detect_country_from_channel(stream_name, epg_id, logo_url)
# Always update the group
channel['Group'] = detected
if old_group != detected:
changes += 1
self.logger.info(f"FORCED UPDATE: '{stream_name}' from '{old_group}' to '{detected}'")
updated_channels.append(channel)
if updated_channels:
# Create backup and rewrite
self._create_backup(self.config.channels_file)
with open(self.config.channels_file, 'w', encoding='utf-8') as f:
for i, channel in enumerate(updated_channels):
if i > 0:
f.write("\n\n")
f.write(self.convert_to_channels_txt_block(channel))
self.logger.info(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)")
def process_import(self) -> List[Dict]:
"""Enhanced M3U import with robust error handling."""
if not os.path.exists(self.config.import_file):
self.logger.info("No import file found, skipping import")
return []
self.logger.info(f"Processing {self.config.import_file}...")
imported_channels = []
try:
with open(self.config.import_file, 'r', encoding='utf-8') as f:
content = f.read()
# Pre-process content with optimized regex
for pattern, replacement in self.url_fix_patterns:
content = pattern.sub(replacement, content)
lines = content.split('\n')
self.logger.info(f"Processing {len(lines)} lines after pre-processing...")
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith('#EXTINF:'):
url_line = self._find_url_line(lines, i + 1)
if url_line:
channel = self.parse_m3u_entry(line, url_line)
is_valid, reason = self.validate_channel(channel)
if is_valid:
channel = self.apply_auto_detection(channel)
imported_channels.append(channel)
else:
self.logger.debug(f"Filtered channel: {channel.get('Stream name')} - {reason}")
i += 1
# Cleanup import file
if self.config.settings.get('auto_cleanup_import', True):
os.remove(self.config.import_file)
self.logger.info("Cleaned up import file")
# Cleanup import file - CLEAR contents instead of deleting the file
if self.config.settings.get('clear_import_after_processing', True):
try:
# Clear the file contents by writing just the M3U header
with open(self.config.import_file, 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n') # Keep M3U header but remove all channels
self.logger.info(f"✅ Cleared contents of {self.config.import_file} (file preserved for future imports)")
except Exception as e:
self.logger.warning(f"Could not clear import file contents: {e}")
elif self.config.settings.get('delete_import_file', False):
try:
os.remove(self.config.import_file)
self.logger.info(f"Deleted import file: {self.config.import_file}")
except Exception as e:
self.logger.warning(f"Could not delete import file: {e}")
else:
self.logger.info(f"Import file left unchanged: {self.config.import_file}")
# CRITICAL: Save the imported channels to channels.txt
if imported_channels:
self.logger.info(f"Saving {len(imported_channels)} imported channels to file...")
# We need to import FileManager here to avoid circular imports
from file_manager import FileManager
file_manager = FileManager(self.config)
# Append the new channels to the file
success = file_manager.append_channels(imported_channels)
if success:
self.logger.info(f"✅ Successfully saved {len(imported_channels)} channels to {self.config.channels_file}")
else:
self.logger.error(f"❌ Failed to save imported channels to file")
self.logger.info(f"Successfully imported {len(imported_channels)} channels")
return imported_channels
except Exception as e:
self.logger.error(f"Error processing import: {e}")
return []
def _find_url_line(self, lines: List[str], start_idx: int) -> Optional[str]:
"""Find the URL line following an EXTINF line."""
for j in range(start_idx, min(len(lines), start_idx + 5)):
potential_url = lines[j].strip()
if not potential_url or potential_url.startswith('#'):
continue
# Clean and validate URL
if '#EXTINF' in potential_url:
potential_url = potential_url.split('#EXTINF')[0].strip()
if (potential_url.startswith(('http://', 'https://', 'rtmp://')) or
potential_url.endswith(('.m3u8', '.ts', '.mp4')) or
'/' in potential_url):
return potential_url
return None
def _create_backup(self, file_path: str):
"""Create a simple backup."""
if os.path.exists(file_path):
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_name = f"{file_path}.backup.{timestamp}"
try:
shutil.copy2(file_path, backup_name)
self.logger.info(f"Created backup: {backup_name}")
except Exception as e:
self.logger.warning(f"Could not create backup: {e}")