All checks were successful
Generate M3U Playlist / build (push) Successful in 1m32s
421 lines
No EOL
18 KiB
Python
421 lines
No EOL
18 KiB
Python
"""
|
|
Channel Processor - Handles channel processing, country detection, and M3U parsing
|
|
"""
|
|
|
|
import re
|
|
import os
|
|
import logging
|
|
import shutil
|
|
from datetime import datetime
|
|
from typing import Dict, List, Optional, Set
|
|
|
|
class ChannelProcessor:
|
|
"""High-performance channel processing with optimizations."""
|
|
|
|
def __init__(self, config):
|
|
self.config = config
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
# Pre-compile regex patterns for performance
|
|
self._compile_patterns()
|
|
|
|
# Caches for performance
|
|
self._country_cache: Dict[str, str] = {}
|
|
self._signature_cache: Dict[str, str] = {}
|
|
|
|
def _compile_patterns(self):
|
|
"""Pre-compile regex patterns for better performance."""
|
|
self.url_fix_patterns = [
|
|
(re.compile(r'(https?://[^\s#]+)(#EXTINF)'), r'\1\n\2'),
|
|
(re.compile(r'(\.m3u8?)(#EXTINF)'), r'\1\n\2'),
|
|
(re.compile(r'([^#\n])#EXTINF'), r'\1\n#EXTINF')
|
|
]
|
|
|
|
self.extinf_patterns = {
|
|
'tvg_id': re.compile(r'tvg-id="([^"]*)"'),
|
|
'tvg_logo': re.compile(r'tvg-logo="([^"]*)"'),
|
|
'group_title': re.compile(r'group-title="([^"]*)"'),
|
|
'stream_name': re.compile(r',\s*(.+)$')
|
|
}
|
|
|
|
def detect_country_from_channel(self, channel_name: str, epg_id: str = "", logo_url: str = "") -> str:
|
|
"""Optimized country detection with caching."""
|
|
# Create cache key
|
|
cache_key = f"{channel_name}|{epg_id}|{logo_url}"
|
|
if cache_key in self._country_cache:
|
|
return self._country_cache[cache_key]
|
|
|
|
all_text = f"{channel_name.lower().strip()} {epg_id.lower().strip()} {logo_url.lower().strip()}"
|
|
|
|
# Check prefixes first (more specific)
|
|
for country, prefixes in self.config.patterns["country_prefixes"].items():
|
|
for prefix in prefixes:
|
|
if prefix in all_text:
|
|
self._country_cache[cache_key] = country
|
|
self.logger.debug(f"Detected {country} for: {channel_name} (prefix: '{prefix}')")
|
|
return country
|
|
|
|
# Check general patterns
|
|
for country, keywords in self.config.patterns["country_patterns"].items():
|
|
for keyword in keywords:
|
|
if keyword in all_text:
|
|
self._country_cache[cache_key] = country
|
|
self.logger.debug(f"Detected {country} for: {channel_name} (keyword: '{keyword}')")
|
|
return country
|
|
|
|
# Cache negative result too
|
|
self._country_cache[cache_key] = "Uncategorized"
|
|
return "Uncategorized"
|
|
|
|
def detect_quality(self, channel_name: str) -> str:
|
|
"""Detect quality with configurable patterns."""
|
|
name_lower = channel_name.lower()
|
|
|
|
for quality, patterns in self.config.patterns["quality_patterns"].items():
|
|
if any(pattern in name_lower for pattern in patterns):
|
|
return quality
|
|
|
|
return ""
|
|
|
|
def is_adult_content(self, channel_name: str) -> bool:
|
|
"""Check for adult content with configurable keywords."""
|
|
name_lower = channel_name.lower()
|
|
return any(keyword in name_lower for keyword in self.config.patterns["adult_keywords"])
|
|
|
|
def validate_channel(self, channel: Dict) -> tuple:
|
|
"""Enhanced channel validation."""
|
|
name = channel.get('Stream name', '').strip()
|
|
url = channel.get('Stream URL', '').strip()
|
|
|
|
if not name or not url:
|
|
return False, "Missing name or URL"
|
|
if len(name) < self.config.settings.get('min_channel_name_length', 2):
|
|
return False, "Name too short"
|
|
if self.config.settings.get('skip_adult_content', True) and self.is_adult_content(name):
|
|
return False, "Adult content filtered"
|
|
if not (url.startswith('http') or url.startswith('rtmp')):
|
|
return False, "Invalid URL"
|
|
|
|
return True, "Valid"
|
|
|
|
def apply_auto_detection(self, channel: Dict) -> Dict:
|
|
"""Apply country detection and quality tags."""
|
|
stream_name = channel.get('Stream name', '')
|
|
epg_id = channel.get('EPG id', '')
|
|
logo_url = channel.get('Logo', '')
|
|
|
|
# Manual overrides first
|
|
for key, new_group in self.config.group_overrides.items():
|
|
if key.lower() in stream_name.lower():
|
|
channel['Group'] = new_group
|
|
return channel
|
|
|
|
# Add quality tag
|
|
if self.config.settings.get('detect_quality', True):
|
|
quality = self.detect_quality(stream_name)
|
|
if quality and quality not in stream_name:
|
|
channel['Stream name'] = f"{stream_name} [{quality}]"
|
|
|
|
# Auto-detect country
|
|
if self.config.settings.get('auto_detect_country', True):
|
|
detected_country = self.detect_country_from_channel(stream_name, epg_id, logo_url)
|
|
channel['Group'] = detected_country
|
|
self.logger.debug(f"Auto-detected: '{stream_name}' → {detected_country}")
|
|
|
|
return channel
|
|
|
|
def get_channel_signature(self, channel: Dict) -> str:
|
|
"""Optimized signature generation with caching."""
|
|
name = channel.get('Stream name', '').strip().lower()
|
|
url = channel.get('Stream URL', '').strip().lower()
|
|
|
|
cache_key = f"{name}|{url}"
|
|
if cache_key in self._signature_cache:
|
|
return self._signature_cache[cache_key]
|
|
|
|
# Clean name
|
|
name_clean = re.sub(r'\s+', ' ', name)
|
|
name_clean = re.sub(r'[^\w\s]', '', name_clean)
|
|
name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip()
|
|
|
|
# Clean URL
|
|
url_clean = url.split('?')[0] if '?' in url else url
|
|
|
|
signature = f"{name_clean}|{url_clean}"
|
|
self._signature_cache[cache_key] = signature
|
|
return signature
|
|
|
|
def remove_duplicates_optimized(self, channels: List[Dict]) -> List[Dict]:
|
|
"""High-performance duplicate removal using sets."""
|
|
if not self.config.settings.get('remove_duplicates', True):
|
|
return channels
|
|
|
|
seen_signatures: Set[str] = set()
|
|
unique_channels: List[Dict] = []
|
|
duplicates = 0
|
|
|
|
for channel in channels:
|
|
signature = self.get_channel_signature(channel)
|
|
if signature not in seen_signatures:
|
|
seen_signatures.add(signature)
|
|
unique_channels.append(channel)
|
|
else:
|
|
duplicates += 1
|
|
|
|
if duplicates > 0:
|
|
self.logger.info(f"Removed {duplicates} duplicate channels")
|
|
|
|
return unique_channels
|
|
|
|
def parse_channel_block(self, block: str) -> Optional[Dict]:
|
|
"""Parse channel block from channels.txt."""
|
|
channel_data = {}
|
|
lines = block.strip().split('\n')
|
|
|
|
for line in lines:
|
|
if '=' in line:
|
|
key, value = line.split('=', 1)
|
|
channel_data[key.strip()] = value.strip()
|
|
|
|
return channel_data if channel_data else None
|
|
|
|
def parse_m3u_entry(self, extinf_line: str, url_line: str) -> Dict:
|
|
"""Enhanced M3U entry parsing using pre-compiled patterns."""
|
|
channel = {}
|
|
|
|
try:
|
|
for field, pattern in self.extinf_patterns.items():
|
|
match = pattern.search(extinf_line)
|
|
if field == 'tvg_id':
|
|
channel['EPG id'] = match.group(1) if match else ''
|
|
elif field == 'tvg_logo':
|
|
channel['Logo'] = match.group(1) if match else ''
|
|
elif field == 'group_title':
|
|
channel['Group'] = match.group(1) if match else 'Uncategorized'
|
|
elif field == 'stream_name':
|
|
if match:
|
|
stream_name = match.group(1).strip()
|
|
channel['Stream name'] = re.sub(r'\s+', ' ', stream_name)
|
|
else:
|
|
channel['Stream name'] = 'Unknown Channel'
|
|
|
|
channel['Stream URL'] = url_line.strip()
|
|
|
|
except Exception as e:
|
|
self.logger.warning(f"Error parsing M3U entry: {e}")
|
|
channel = {
|
|
'EPG id': '', 'Logo': '', 'Group': 'Uncategorized',
|
|
'Stream name': 'Parse Error', 'Stream URL': url_line.strip()
|
|
}
|
|
|
|
return channel
|
|
|
|
def convert_to_channels_txt_block(self, channel_data: Dict) -> str:
|
|
"""Convert to channels.txt format."""
|
|
block = []
|
|
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
|
|
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
|
|
block.append(f"Logo = {channel_data.get('Logo', '')}")
|
|
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
|
|
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
|
|
return "\n".join(block)
|
|
|
|
def clean_corrupted_channels(self):
|
|
"""Clean up any corrupted entries in existing channels.txt"""
|
|
if not os.path.exists(self.config.channels_file):
|
|
return
|
|
|
|
self.logger.info("Cleaning up any corrupted entries in channels.txt...")
|
|
|
|
with open(self.config.channels_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
channel_blocks = re.split(r'\n\s*\n+', content.strip())
|
|
cleaned_channels = []
|
|
fixed_count = 0
|
|
|
|
for block in channel_blocks:
|
|
if block.strip():
|
|
channel = self.parse_channel_block(block)
|
|
if channel:
|
|
# Clean corrupted Stream URL
|
|
stream_url = channel.get('Stream URL', '')
|
|
if '#EXTINF' in stream_url or 'group-title=' in stream_url:
|
|
if '#EXTINF' in stream_url:
|
|
stream_url = stream_url.split('#EXTINF')[0].strip()
|
|
if 'group-title=' in stream_url:
|
|
stream_url = stream_url.split('group-title=')[0].strip()
|
|
channel['Stream URL'] = stream_url
|
|
fixed_count += 1
|
|
self.logger.info(f"Fixed corrupted URL for: {channel.get('Stream name')}")
|
|
|
|
# Clean corrupted Logo URL
|
|
logo_url = channel.get('Logo', '')
|
|
if logo_url and ('group-title=' in logo_url or '#EXTINF' in logo_url):
|
|
if 'group-title=' in logo_url:
|
|
logo_url = logo_url.split('group-title=')[0].strip()
|
|
if '#EXTINF' in logo_url:
|
|
logo_url = logo_url.split('#EXTINF')[0].strip()
|
|
channel['Logo'] = logo_url
|
|
fixed_count += 1
|
|
self.logger.info(f"Fixed corrupted logo for: {channel.get('Stream name')}")
|
|
|
|
cleaned_channels.append(channel)
|
|
|
|
if fixed_count > 0:
|
|
self.logger.info(f"Fixed {fixed_count} corrupted entries, rewriting file...")
|
|
|
|
# Create backup
|
|
self._create_backup(self.config.channels_file)
|
|
|
|
with open(self.config.channels_file, 'w', encoding='utf-8') as f:
|
|
for i, channel in enumerate(cleaned_channels):
|
|
if i > 0:
|
|
f.write("\n\n")
|
|
f.write(self.convert_to_channels_txt_block(channel))
|
|
|
|
self.logger.info(f"Successfully cleaned and rewrote channels.txt")
|
|
else:
|
|
self.logger.info("No corrupted entries found to fix")
|
|
|
|
def update_existing_channels_with_country_detection(self):
|
|
"""FIXED: Re-detect countries for existing channels - FORCE UPDATE ALL."""
|
|
if not os.path.exists(self.config.channels_file):
|
|
return
|
|
|
|
self.logger.info("FORCE re-detecting countries for ALL existing channels...")
|
|
|
|
with open(self.config.channels_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
channel_blocks = re.split(r'\n\s*\n+', content.strip())
|
|
updated_channels = []
|
|
changes = 0
|
|
|
|
for block in channel_blocks:
|
|
if block.strip():
|
|
channel = self.parse_channel_block(block)
|
|
if channel:
|
|
old_group = channel.get('Group', 'Uncategorized')
|
|
stream_name = channel.get('Stream name', '')
|
|
epg_id = channel.get('EPG id', '')
|
|
logo_url = channel.get('Logo', '')
|
|
|
|
# FORCE detection for ALL channels
|
|
detected = self.detect_country_from_channel(stream_name, epg_id, logo_url)
|
|
|
|
# Always update the group
|
|
channel['Group'] = detected
|
|
if old_group != detected:
|
|
changes += 1
|
|
self.logger.info(f"FORCED UPDATE: '{stream_name}' from '{old_group}' to '{detected}'")
|
|
|
|
updated_channels.append(channel)
|
|
|
|
if updated_channels:
|
|
# Create backup and rewrite
|
|
self._create_backup(self.config.channels_file)
|
|
|
|
with open(self.config.channels_file, 'w', encoding='utf-8') as f:
|
|
for i, channel in enumerate(updated_channels):
|
|
if i > 0:
|
|
f.write("\n\n")
|
|
f.write(self.convert_to_channels_txt_block(channel))
|
|
|
|
self.logger.info(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)")
|
|
|
|
def process_import(self) -> List[Dict]:
|
|
"""Enhanced M3U import with robust error handling."""
|
|
if not os.path.exists(self.config.import_file):
|
|
self.logger.info("No import file found, skipping import")
|
|
return []
|
|
|
|
self.logger.info(f"Processing {self.config.import_file}...")
|
|
|
|
imported_channels = []
|
|
|
|
try:
|
|
with open(self.config.import_file, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
# Pre-process content with optimized regex
|
|
for pattern, replacement in self.url_fix_patterns:
|
|
content = pattern.sub(replacement, content)
|
|
|
|
lines = content.split('\n')
|
|
self.logger.info(f"Processing {len(lines)} lines after pre-processing...")
|
|
|
|
i = 0
|
|
while i < len(lines):
|
|
line = lines[i].strip()
|
|
|
|
if line.startswith('#EXTINF:'):
|
|
url_line = self._find_url_line(lines, i + 1)
|
|
if url_line:
|
|
channel = self.parse_m3u_entry(line, url_line)
|
|
is_valid, reason = self.validate_channel(channel)
|
|
|
|
if is_valid:
|
|
channel = self.apply_auto_detection(channel)
|
|
imported_channels.append(channel)
|
|
else:
|
|
self.logger.debug(f"Filtered channel: {channel.get('Stream name')} - {reason}")
|
|
|
|
i += 1
|
|
|
|
# Cleanup import file
|
|
if self.config.settings.get('auto_cleanup_import', True):
|
|
os.remove(self.config.import_file)
|
|
self.logger.info("Cleaned up import file")
|
|
|
|
# CRITICAL: Save the imported channels to channels.txt
|
|
if imported_channels:
|
|
self.logger.info(f"Saving {len(imported_channels)} imported channels to file...")
|
|
|
|
# We need to import FileManager here to avoid circular imports
|
|
from file_manager import FileManager
|
|
file_manager = FileManager(self.config)
|
|
|
|
# Append the new channels to the file
|
|
success = file_manager.append_channels(imported_channels)
|
|
if success:
|
|
self.logger.info(f"✅ Successfully saved {len(imported_channels)} channels to {self.config.channels_file}")
|
|
else:
|
|
self.logger.error(f"❌ Failed to save imported channels to file")
|
|
|
|
self.logger.info(f"Successfully imported {len(imported_channels)} channels")
|
|
return imported_channels
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing import: {e}")
|
|
return []
|
|
|
|
def _find_url_line(self, lines: List[str], start_idx: int) -> Optional[str]:
|
|
"""Find the URL line following an EXTINF line."""
|
|
for j in range(start_idx, min(len(lines), start_idx + 5)):
|
|
potential_url = lines[j].strip()
|
|
|
|
if not potential_url or potential_url.startswith('#'):
|
|
continue
|
|
|
|
# Clean and validate URL
|
|
if '#EXTINF' in potential_url:
|
|
potential_url = potential_url.split('#EXTINF')[0].strip()
|
|
|
|
if (potential_url.startswith(('http://', 'https://', 'rtmp://')) or
|
|
potential_url.endswith(('.m3u8', '.ts', '.mp4')) or
|
|
'/' in potential_url):
|
|
return potential_url
|
|
|
|
return None
|
|
|
|
def _create_backup(self, file_path: str):
|
|
"""Create a simple backup."""
|
|
if os.path.exists(file_path):
|
|
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
|
|
backup_name = f"{file_path}.backup.{timestamp}"
|
|
try:
|
|
shutil.copy2(file_path, backup_name)
|
|
self.logger.info(f"Created backup: {backup_name}")
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not create backup: {e}") |