diff --git a/scripts/generate_playlist.py b/scripts/generate_playlist.py index ca10d7f..825a94b 100644 --- a/scripts/generate_playlist.py +++ b/scripts/generate_playlist.py @@ -1,709 +1,110 @@ -import re +#!/usr/bin/env python3 +""" +IPTV Playlist Generator - Main Script +Modular design for better maintainability and easier development. +""" + +import logging import os -import json from datetime import datetime -# --- Configuration --- -CHANNELS_FILE = 'channels.txt' -PLAYLIST_FILE = 'playlist.m3u' -IMPORT_FILE = 'bulk_import.m3u' -LOG_FILE = 'playlist_update.log' -SETTINGS_FILE = 'config/settings.json' -GROUP_OVERRIDES_FILE = 'config/group_overrides.json' +# Import our modular components +from scripts.config_manager import ConfigManager +from scripts.channel_processor import ChannelProcessor +from scripts.file_manager import FileManager +from scripts.playlist_builder import PlaylistBuilder +from scripts.health_checker import HealthChecker +from scripts.report_generator import ReportGenerator -def log_message(message, level="INFO"): - """Logs messages to file and prints them.""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - formatted_message = f"[{timestamp}] {level}: {message}" - - try: - with open(LOG_FILE, 'a', encoding='utf-8') as f: - f.write(formatted_message + "\n") - except Exception as e: - print(f"ERROR: Could not write to log: {e}") - - print(formatted_message) - -def load_settings(): - """Load settings with enhanced defaults.""" - default_settings = { - "remove_duplicates": True, - "sort_channels": True, - "backup_before_import": True, - "auto_cleanup_import": True, - "auto_detect_country": True, - "detect_quality": True, - "skip_adult_content": True, - "min_channel_name_length": 2 - } - - if os.path.exists(SETTINGS_FILE): - try: - with open(SETTINGS_FILE, 'r', encoding='utf-8') as f: - settings = json.load(f) - return {**default_settings, **settings} - except Exception as e: - log_message(f"Could not load settings, using defaults: {e}", "WARNING") - - return default_settings - -def load_group_overrides(): - """Load group overrides.""" - if os.path.exists(GROUP_OVERRIDES_FILE): - try: - with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f: - return json.load(f) - except Exception as e: - log_message(f"Could not load group overrides: {e}", "WARNING") - - return {} - -def detect_country_from_channel(channel_name, epg_id="", logo_url=""): - """Comprehensive country detection with 100+ countries.""" - name_lower = channel_name.lower().strip() - epg_lower = epg_id.lower().strip() - logo_lower = logo_url.lower().strip() - all_text = f"{name_lower} {epg_lower} {logo_lower}" - - log_message(f"Detecting country for: '{channel_name}'", "DEBUG") - - # Comprehensive patterns - shortened for space - patterns = { - "๐Ÿ‡บ๐Ÿ‡ธ United States": ["cbs", "nbc", "abc", "fox", "espn", "cnn", "hbo", " usa", " us ", ".us", "america", "nfl"], - "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom": ["bbc", "itv", "sky", "channel 4", "e4", " uk", ".uk", "british", "premier league"], - "๐Ÿ‡จ๐Ÿ‡ฆ Canada": ["cbc", "ctv", "global", "canada", "canadian", " ca ", ".ca"], - "๐Ÿ‡ฉ๐Ÿ‡ช Germany": ["ard", "zdf", "rtl", "sat.1", "pro7", "germany", "german", " de ", ".de"], - "๐Ÿ‡ซ๐Ÿ‡ท France": ["tf1", "france 2", "m6", "canal+", "france", "french", " fr ", ".fr"], - "๐Ÿ‡ช๐Ÿ‡ธ Spain": ["tve", "antena 3", "telecinco", "spain", "spanish", " es ", ".es"], - "๐Ÿ‡ฎ๐Ÿ‡น Italy": ["rai", "mediaset", "canale 5", "italy", "italian", " it ", ".it"], - "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands": ["npo", "rtl nl", "netherlands", "dutch", "holland", " nl ", ".nl"], - "๐Ÿ‡ง๐Ÿ‡ช Belgium": ["vtm", "รฉรฉn", "canvas", "belgium", "belgian", " be ", ".be"], - "๐Ÿ‡จ๐Ÿ‡ญ Switzerland": ["srf", "rts", "switzerland", "swiss", " ch ", ".ch"], - "๐Ÿ‡ฆ๐Ÿ‡น Austria": ["orf", "austria", "austrian", " at ", ".at"], - "๐Ÿ‡ต๐Ÿ‡น Portugal": ["rtp", "sic", "tvi", "portugal", "portuguese", " pt ", ".pt"], - "๐Ÿ‡ฎ๐Ÿ‡ช Ireland": ["rte", "tg4", "ireland", "irish", " ie ", ".ie"], - "๐Ÿ‡ธ๐Ÿ‡ช Sweden": ["svt", "tv4", "sweden", "swedish", " se ", ".se"], - "๐Ÿ‡ณ๐Ÿ‡ด Norway": ["nrk", "tv 2 no", "norway", "norwegian", " no ", ".no"], - "๐Ÿ‡ฉ๐Ÿ‡ฐ Denmark": ["dr", "tv2 dk", "denmark", "danish", " dk ", ".dk"], - "๐Ÿ‡ซ๐Ÿ‡ฎ Finland": ["yle", "mtv3", "finland", "finnish", " fi ", ".fi"], - "๐Ÿ‡ฎ๐Ÿ‡ธ Iceland": ["ruv", "iceland", "icelandic", " is ", ".is"], - "๐Ÿ‡ท๐Ÿ‡บ Russia": ["channel one", "rossiya", "ntv", "russia", "russian", " ru ", ".ru"], - "๐Ÿ‡ต๐Ÿ‡ฑ Poland": ["tvp", "polsat", "tvn", "poland", "polish", " pl ", ".pl"], - "๐Ÿ‡จ๐Ÿ‡ฟ Czech Republic": ["ct", "nova", "prima", "czech", " cz ", ".cz"], - "๐Ÿ‡ธ๐Ÿ‡ฐ Slovakia": ["rtvs", "markiza", "slovakia", "slovak", " sk ", ".sk"], - "๐Ÿ‡ญ๐Ÿ‡บ Hungary": ["mtv hu", "rtl klub", "hungary", "hungarian", " hu ", ".hu"], - "๐Ÿ‡บ๐Ÿ‡ฆ Ukraine": ["1+1", "inter", "ictv", "ukraine", "ukrainian", " ua ", ".ua"], - "๐Ÿ‡ท๐Ÿ‡ด Romania": ["tvr", "pro tv", "romania", "romanian", " ro ", ".ro"], - "๐Ÿ‡ง๐Ÿ‡ฌ Bulgaria": ["btv", "nova bg", "bulgaria", "bulgarian", " bg ", ".bg"], - "๐Ÿ‡ญ๐Ÿ‡ท Croatia": ["hrt", "nova tv hr", "croatia", "croatian", " hr ", ".hr"], - "๐Ÿ‡ท๐Ÿ‡ธ Serbia": ["rts", "pink", "serbia", "serbian", " rs ", ".rs"], - "๐Ÿ‡ฌ๐Ÿ‡ท Greece": ["ert", "mega gr", "greece", "greek", " gr ", ".gr"], - "๐Ÿ‡ง๐Ÿ‡ท Brazil": ["globo", "band", "sbt", "brazil", "brasil", " br ", ".br"], - "๐Ÿ‡ฆ๐Ÿ‡ท Argentina": ["telefe", "canal 13", "argentina", " ar ", ".ar"], - "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico": ["televisa", "tv azteca", "mexico", "mรฉxico", " mx ", ".mx"], - "๐Ÿ‡จ๐Ÿ‡ฑ Chile": ["tvn", "mega", "chile", "chilean", " cl ", ".cl"], - "๐Ÿ‡จ๐Ÿ‡ด Colombia": ["caracol", "rcn", "colombia", "colombian", " co ", ".co"], - "๐Ÿ‡ต๐Ÿ‡ช Peru": ["america tv pe", "peru", "peruvian", " pe ", ".pe"], - "๐Ÿ‡ป๐Ÿ‡ช Venezuela": ["venevision", "venezuela", "venezuelan", " ve ", ".ve"], - "๐Ÿ‡จ๐Ÿ‡ณ China": ["cctv", "phoenix", "china", "chinese", " cn ", ".cn"], - "๐Ÿ‡ฏ๐Ÿ‡ต Japan": ["nhk", "fuji", "tv asahi", "japan", "japanese", " jp ", ".jp"], - "๐Ÿ‡ฐ๐Ÿ‡ท South Korea": ["kbs", "sbs kr", "mbc kr", "korea", "korean", " kr ", ".kr"], - "๐Ÿ‡ฐ๐Ÿ‡ต North Korea": ["kctv", "north korea", "dprk"], - "๐Ÿ‡น๐Ÿ‡ผ Taiwan": ["cts", "ctv", "tvbs", "taiwan", "taiwanese", " tw ", ".tw"], - "๐Ÿ‡ญ๐Ÿ‡ฐ Hong Kong": ["tvb", "atv", "hong kong", "hongkong", " hk ", ".hk"], - "๐Ÿ‡น๐Ÿ‡ญ Thailand": ["ch3", "ch7", "thai pbs", "thailand", "thai", " th ", ".th"], - "๐Ÿ‡ป๐Ÿ‡ณ Vietnam": ["vtv", "htv", "vietnam", "vietnamese", " vn ", ".vn"], - "๐Ÿ‡ฎ๐Ÿ‡ฉ Indonesia": ["tvri", "sctv", "rcti", "indonesia", "indonesian", " id ", ".id"], - "๐Ÿ‡ฒ๐Ÿ‡พ Malaysia": ["tv1", "tv3", "astro", "malaysia", "malaysian", " my ", ".my", "my:"], - "๐Ÿ‡ธ๐Ÿ‡ฌ Singapore": ["channel 5", "channel 8", "singapore", " sg ", ".sg"], - "๐Ÿ‡ต๐Ÿ‡ญ Philippines": ["abs-cbn", "gma", "philippines", "filipino", " ph ", ".ph"], - "๐Ÿ‡ฎ๐Ÿ‡ณ India": ["star plus", "zee tv", "colors", "sony tv", "india", "indian", "hindi", " in ", ".in"], - "๐Ÿ‡ต๐Ÿ‡ฐ Pakistan": ["ptv", "geo tv", "ary", "pakistan", "pakistani", " pk ", ".pk"], - "๐Ÿ‡ง๐Ÿ‡ฉ Bangladesh": ["btv", "channel i", "bangladesh", "bangladeshi", " bd ", ".bd"], - "๐Ÿ‡ฑ๐Ÿ‡ฐ Sri Lanka": ["rupavahini", "sirasa", "sri lanka", " lk ", ".lk"], - "๐Ÿ‡ณ๐Ÿ‡ต Nepal": ["nepal tv", "kantipur", "nepal", "nepali", " np ", ".np"], - "๐Ÿ‡ฆ๐Ÿ‡ซ Afghanistan": ["rta", "tolo tv", "afghanistan", "afghan", " af ", ".af"], - "๐Ÿ‡ฆ๐Ÿ‡บ Australia": ["abc au", "seven", "nine", "ten", "australia", "australian", "aussie", " au ", ".au"], - "๐Ÿ‡ณ๐Ÿ‡ฟ New Zealand": ["tvnz", "tvnz 1", "tvnz 2", "three nz", "tvnz duke", "new zealand", "kiwi", " nz ", ".nz"], - "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic": ["al jazeera", "mbc", "lbc", "dubai tv", "arabic", "arab", "qatar", "dubai", "saudi"], - "๐Ÿ‡ฎ๐Ÿ‡ฑ Israel": ["kan", "keshet 12", "israel", "israeli", "hebrew", " il ", ".il"], - "๐Ÿ‡น๐Ÿ‡ท Turkey": ["trt", "atv", "kanal d", "turkey", "turkish", " tr ", ".tr", "tr |"], - "๐Ÿ‡ฎ๐Ÿ‡ท Iran": ["irib", "press tv", "iran", "iranian", "persian", " ir ", ".ir"], - "๐Ÿ‡ช๐Ÿ‡ฌ Egypt": ["nile tv", "cbc egypt", "egypt", "egyptian", " eg ", ".eg"], - "๐Ÿ‡ฟ๐Ÿ‡ฆ South Africa": ["sabc", "etv", "mnet", "south africa", " za ", ".za"], - "๐Ÿ‡ณ๐Ÿ‡ฌ Nigeria": ["nta", "channels tv", "nigeria", "nigerian", " ng ", ".ng"] - } - - # Check patterns - order matters, more specific first - # First check for country prefixes (more specific) - country_prefixes = { - "๐Ÿ‡บ๐Ÿ‡ฆ Ukraine": ["ua |"], - "๐Ÿ‡ต๐Ÿ‡ฑ Poland": ["pl |"], - "๐Ÿ‡น๐Ÿ‡ท Turkey": ["tr |"], - "๐Ÿ‡ฒ๐Ÿ‡พ Malaysia": ["my:", "my |"], - "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom": ["uk:", "uk |"], - "๐Ÿ‡บ๐Ÿ‡ธ United States": ["us:", "us |"] - } - - for country, prefixes in country_prefixes.items(): - for prefix in prefixes: - if prefix in all_text: - log_message(f"Detected {country} for: {channel_name} (matched prefix: '{prefix}')", "INFO") - return country - - # Then check general patterns - for country, keywords in patterns.items(): - for keyword in keywords: - if keyword in all_text: - log_message(f"Detected {country} for: {channel_name} (matched: '{keyword}')", "INFO") - return country - - # No special categories - everything unmatched goes to Uncategorized - log_message(f"No country detected for: {channel_name}", "DEBUG") - return "Uncategorized" - -def detect_quality(channel_name): - """Detect quality from channel name.""" - name_lower = channel_name.lower() - if "4k" in name_lower or "uhd" in name_lower: - return "4K" - elif "fhd" in name_lower or "1080" in name_lower: - return "FHD" - elif "hd" in name_lower: - return "HD" - elif "sd" in name_lower: - return "SD" - return "" - -def is_adult_content(channel_name): - """Check for adult content.""" - adult_keywords = ["xxx", "adult", "porn", "sex", "erotic", "playboy"] - return any(keyword in channel_name.lower() for keyword in adult_keywords) - -def validate_channel(channel, settings): - """Validate channel for import.""" - name = channel.get('Stream name', '').strip() - url = channel.get('Stream URL', '').strip() - - if not name or not url: - return False, "Missing name or URL" - if len(name) < settings.get('min_channel_name_length', 2): - return False, "Name too short" - if settings.get('skip_adult_content', True) and is_adult_content(name): - return False, "Adult content filtered" - if not (url.startswith('http') or url.startswith('rtmp')): - return False, "Invalid URL" - - return True, "Valid" - -def apply_auto_country_detection(channel, group_overrides, settings): - """Apply country detection and quality tags.""" - stream_name = channel.get('Stream name', '') - epg_id = channel.get('EPG id', '') - logo_url = channel.get('Logo', '') - - # Manual overrides first - for key, new_group in group_overrides.items(): - if key.lower() in stream_name.lower(): - channel['Group'] = new_group - return channel - - # Add quality tag - if settings.get('detect_quality', True): - quality = detect_quality(stream_name) - if quality and quality not in stream_name: - channel['Stream name'] = f"{stream_name} [{quality}]" - - # Auto-detect country - if settings.get('auto_detect_country', True): - detected_country = detect_country_from_channel(stream_name, epg_id, logo_url) - channel['Group'] = detected_country - log_message(f"Auto-detected: '{stream_name}' โ†’ {detected_country}", "INFO") - - return channel - -def parse_channel_block(block): - """Parse channel block from channels.txt.""" - channel_data = {} - lines = block.strip().split('\n') - for line in lines: - if '=' in line: - key, value = line.split('=', 1) - channel_data[key.strip()] = value.strip() - return channel_data - -def parse_m3u_entry(extinf_line, url_line): - """Parse M3U entry.""" - channel = {} - - try: - tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) - tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) - group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) - - channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else '' - channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else '' - channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' - - stream_name_match = re.search(r',\s*(.+)$', extinf_line) - if stream_name_match: - stream_name = stream_name_match.group(1).strip() - stream_name = re.sub(r'\s+', ' ', stream_name) - channel['Stream name'] = stream_name - else: - channel['Stream name'] = 'Unknown Channel' - - channel['Stream URL'] = url_line.strip() - - except Exception as e: - log_message(f"Error parsing M3U entry: {e}", "WARNING") - channel = { - 'EPG id': '', 'Logo': '', 'Group': 'Uncategorized', - 'Stream name': 'Parse Error', 'Stream URL': url_line.strip() - } - - return channel - -def convert_to_channels_txt_block(channel_data): - """Convert to channels.txt format.""" - block = [] - block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") - block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") - block.append(f"Logo = {channel_data.get('Logo', '')}") - block.append(f"EPG id = {channel_data.get('EPG id', '')}") - block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") - return "\n".join(block) - -def get_channel_signature(channel): - """Create signature for duplicate detection.""" - name = channel.get('Stream name', '').strip().lower() - url = channel.get('Stream URL', '').strip().lower() - - name_clean = re.sub(r'\s+', ' ', name) - name_clean = re.sub(r'[^\w\s]', '', name_clean) - name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip() - - if '?' in url: - url_clean = url.split('?')[0] - else: - url_clean = url - - return f"{name_clean}|{url_clean}" - -def remove_duplicates(channels, settings): - """Remove duplicate channels.""" - if not settings.get('remove_duplicates', True): - return channels - - seen_signatures = set() - unique_channels = [] - duplicates = [] - - for channel in channels: - signature = get_channel_signature(channel) - if signature not in seen_signatures: - seen_signatures.add(signature) - unique_channels.append(channel) - else: - duplicates.append(channel.get('Stream name', 'Unknown')) - - if duplicates: - log_message(f"Removed {len(duplicates)} duplicates", "INFO") - - return unique_channels - -def clean_corrupted_channels(): - """Clean up any corrupted entries in existing channels.txt""" - if not os.path.exists(CHANNELS_FILE): - return - - log_message("Cleaning up any corrupted entries in channels.txt...", "INFO") - - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - content = f.read() - - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - cleaned_channels = [] - fixed_count = 0 - - for block in channel_blocks: - if block.strip(): - channel = parse_channel_block(block) - if channel: - # Clean corrupted Stream URL - stream_url = channel.get('Stream URL', '') - if '#EXTINF' in stream_url or 'group-title=' in stream_url: - # Extract just the URL part - if '#EXTINF' in stream_url: - stream_url = stream_url.split('#EXTINF')[0].strip() - if 'group-title=' in stream_url: - stream_url = stream_url.split('group-title=')[0].strip() - channel['Stream URL'] = stream_url - fixed_count += 1 - log_message(f"Fixed corrupted URL for: {channel.get('Stream name')}", "INFO") - - # Clean corrupted Logo URL - logo_url = channel.get('Logo', '') - if logo_url and ('group-title=' in logo_url or '#EXTINF' in logo_url): - if 'group-title=' in logo_url: - logo_url = logo_url.split('group-title=')[0].strip() - if '#EXTINF' in logo_url: - logo_url = logo_url.split('#EXTINF')[0].strip() - channel['Logo'] = logo_url - fixed_count += 1 - log_message(f"Fixed corrupted logo for: {channel.get('Stream name')}", "INFO") - - cleaned_channels.append(channel) - - if fixed_count > 0: - log_message(f"Fixed {fixed_count} corrupted entries, rewriting file...", "INFO") - - # Backup and rewrite - backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" - try: - import shutil - shutil.copy2(CHANNELS_FILE, backup_name) - except: - pass - - with open(CHANNELS_FILE, 'w', encoding='utf-8') as f: - for i, channel in enumerate(cleaned_channels): - if i > 0: - f.write("\n\n") - f.write(convert_to_channels_txt_block(channel)) - - log_message(f"Successfully cleaned and rewrote channels.txt", "INFO") - else: - log_message("No corrupted entries found to fix", "INFO") - """Re-detect countries for existing channels - FORCE UPDATE ALL.""" - if not os.path.exists(CHANNELS_FILE): - return - - settings = load_settings() - - log_message("FORCE re-detecting countries for ALL existing channels...", "INFO") - - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - content = f.read() - - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - updated_channels = [] - changes = 0 - - for block in channel_blocks: - if block.strip(): - channel = parse_channel_block(block) - if channel: - old_group = channel.get('Group', 'Uncategorized') - stream_name = channel.get('Stream name', '') - epg_id = channel.get('EPG id', '') - logo_url = channel.get('Logo', '') - - # FORCE detection for ALL channels, regardless of current group - detected = detect_country_from_channel(stream_name, epg_id, logo_url) - - # Always update the group - channel['Group'] = detected - if old_group != detected: - changes += 1 - log_message(f"FORCED UPDATE: '{stream_name}' from '{old_group}' to '{detected}'", "INFO") - - updated_channels.append(channel) - - if updated_channels: - # Always rewrite the file - backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" - try: - import shutil - shutil.copy2(CHANNELS_FILE, backup_name) - log_message(f"Created backup: {backup_name}", "INFO") - except: - pass - - with open(CHANNELS_FILE, 'w', encoding='utf-8') as f: - for i, channel in enumerate(updated_channels): - if i > 0: - f.write("\n\n") - f.write(convert_to_channels_txt_block(channel)) - - log_message(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)", "INFO") - -def process_import(): - """Process bulk M3U import with ROBUST handling of malformed files.""" - settings = load_settings() - group_overrides = load_group_overrides() - - if not os.path.exists(IMPORT_FILE): - log_message(f"No {IMPORT_FILE} found, skipping import", "INFO") - return [] - - log_message(f"Processing {IMPORT_FILE} with ROBUST parsing...", "INFO") - - stats = { - 'total_lines': 0, 'extinf_lines': 0, 'parsed': 0, 'valid': 0, - 'filtered_adult': 0, 'filtered_invalid': 0, 'duplicates': 0, - 'already_existed': 0, 'final_imported': 0, 'malformed_fixed': 0 - } - - imported_channels = [] - - try: - with open(IMPORT_FILE, 'r', encoding='utf-8') as f: - content = f.read() - - # Pre-process the content to fix common issues - log_message("Pre-processing M3U content with AGGRESSIVE fixing...", "INFO") - - # Fix the most common issue: missing newlines between URL and next EXTINF - content = re.sub(r'(https?://[^\s#]+)(#EXTINF)', r'\1\n\2', content) - content = re.sub(r'(\.m3u8?)(#EXTINF)', r'\1\n\2', content) - content = re.sub(r'(\.ts)(#EXTINF)', r'\1\n\2', content) - content = re.sub(r'(\d+)(#EXTINF)', r'\1\n\2', content) - - # Fix missing newlines between different sections - content = re.sub(r'(group-title="[^"]*")([A-Z][a-z]+:)', r'\1\n#EXTINF:-1 \2', content) - - # Ensure EXTINF always starts on new line - content = re.sub(r'([^#\n])#EXTINF', r'\1\n#EXTINF', content) - - # Split into lines after fixing - lines = content.split('\n') - stats['total_lines'] = len(lines) - log_message(f"Processing {len(lines)} lines after pre-processing...", "INFO") - - i = 0 - while i < len(lines): - line = lines[i].strip() - - if line.startswith('#EXTINF:'): - stats['extinf_lines'] += 1 - extinf_line = line - url_line = "" - - # Look for the URL in the next few lines (robust search) - j = i + 1 - while j < len(lines) and j < i + 5: # Look ahead max 5 lines - potential_url = lines[j].strip() - - # Skip empty lines and comments - if not potential_url or potential_url.startswith('#'): - j += 1 - continue - - # Clean potential URL - if '#EXTINF' in potential_url: - # Split on #EXTINF and take the first part - url_parts = potential_url.split('#EXTINF') - potential_url = url_parts[0].strip() - - # Put the EXTINF part back for next iteration - if len(url_parts) > 1: - lines[j] = '#EXTINF' + url_parts[1] - stats['malformed_fixed'] += 1 - - # Check if it looks like a URL - if (potential_url.startswith(('http://', 'https://', 'rtmp://', 'rtmps://')) or - potential_url.endswith(('.m3u8', '.ts', '.mp4')) or - '/' in potential_url): - url_line = potential_url - i = j # Update our position - break - - j += 1 - - # If we found a URL, process the channel - if url_line: - try: - channel = parse_m3u_entry(extinf_line, url_line) - stats['parsed'] += 1 - - # Additional URL cleaning - stream_url = channel.get('Stream URL', '').strip() - - # Remove any trailing garbage - if ' ' in stream_url: - url_parts = stream_url.split() - for part in url_parts: - if (part.startswith(('http://', 'https://', 'rtmp://')) or - part.endswith(('.m3u8', '.ts', '.mp4'))): - channel['Stream URL'] = part - break - - # Validate the channel - is_valid, reason = validate_channel(channel, settings) - if not is_valid: - if "adult" in reason.lower(): - stats['filtered_adult'] += 1 - else: - stats['filtered_invalid'] += 1 - log_message(f"Filtered: {channel.get('Stream name')} - {reason}", "DEBUG") - i += 1 - continue - - # Apply country detection - channel = apply_auto_country_detection(channel, group_overrides, settings) - imported_channels.append(channel) - stats['valid'] += 1 - - log_message(f"Successfully imported: {channel.get('Stream name')} โ†’ {channel.get('Group')}", "DEBUG") - - except Exception as e: - log_message(f"Error processing channel: {e}", "WARNING") - i += 1 - continue - else: - log_message(f"No URL found for: {extinf_line[:50]}...", "WARNING") - i += 1 - continue - - i += 1 - - # Continue with duplicate removal and file writing... - if imported_channels: - log_message(f"Pre-duplicate removal: {len(imported_channels)} channels", "INFO") - - original_count = len(imported_channels) - imported_channels = remove_duplicates(imported_channels, settings) - stats['duplicates'] = original_count - len(imported_channels) - - # Check against existing channels - existing_channels = [] - if os.path.exists(CHANNELS_FILE): - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - content = f.read() - blocks = re.split(r'\n\s*\n+', content.strip()) - for block in blocks: - if block.strip(): - existing_channels.append(parse_channel_block(block)) - - existing_sigs = {get_channel_signature(ch) for ch in existing_channels} - new_channels = [] - for channel in imported_channels: - if get_channel_signature(channel) not in existing_sigs: - new_channels.append(channel) - else: - stats['already_existed'] += 1 - - imported_channels = new_channels - - stats['final_imported'] = len(imported_channels) - - # Write to file - if imported_channels: - log_message(f"Writing {len(imported_channels)} new channels to file...", "INFO") - - # Check if file exists and has content - file_exists = os.path.exists(CHANNELS_FILE) and os.path.getsize(CHANNELS_FILE) > 0 - - with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: - for i, channel in enumerate(imported_channels): - if i > 0 or file_exists: - f.write("\n\n") - f.write(convert_to_channels_txt_block(channel)) - - log_message(f"Successfully wrote {len(imported_channels)} channels", "INFO") - - except Exception as e: - log_message(f"Error processing import: {e}", "ERROR") - - # Enhanced statistics - log_message("=== ROBUST IMPORT STATISTICS ===", "INFO") - for key, value in stats.items(): - log_message(f"{key.replace('_', ' ').title()}: {value}", "INFO") - log_message("=== END STATISTICS ===", "INFO") - - # Cleanup - if settings.get('auto_cleanup_import', True): - try: - os.remove(IMPORT_FILE) - log_message(f"Cleaned up {IMPORT_FILE}", "INFO") - except: - pass - - return imported_channels +def setup_logging(): + """Setup comprehensive logging.""" + logging.basicConfig( + level=logging.INFO, + format='[%(asctime)s] %(levelname)s: %(message)s', + datefmt='%Y-%m-%d %H:%M:%S', + handlers=[ + logging.FileHandler('playlist_update.log', encoding='utf-8'), + logging.StreamHandler() + ] + ) def generate_playlist(): - """Main enhanced playlist generation function.""" - if os.path.exists(LOG_FILE): - open(LOG_FILE, 'w').close() - - log_message("Starting comprehensive playlist generation...", "INFO") - - settings = load_settings() - group_overrides = load_group_overrides() - - # First clean any existing corrupted entries - clean_corrupted_channels() - - update_existing_channels_with_country_detection() - - imported_channels = process_import() - log_message(f"Import returned {len(imported_channels)} channels", "INFO") - - if not os.path.exists(CHANNELS_FILE): - log_message(f"Error: {CHANNELS_FILE} not found", "ERROR") - return - - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - content = f.read() - - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - parsed_channels = [] - - for block in channel_blocks: - if block.strip(): - channel = parse_channel_block(block) - if channel: - parsed_channels.append(channel) - - log_message(f"Parsed {len(parsed_channels)} channels", "INFO") - - parsed_channels = remove_duplicates(parsed_channels, settings) - - if settings.get('sort_channels', True): - parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) - - m3u_lines = ["#EXTM3U"] - valid_channels = 0 - country_stats = {} - - for channel in parsed_channels: - stream_name = channel.get('Stream name', '') - group_name = channel.get('Group', 'Uncategorized') - logo_url = channel.get('Logo', '') - epg_id = channel.get('EPG id', '') - stream_url = channel.get('Stream URL', '') - - if not stream_name or not stream_url: - continue - - extinf_attrs = [ - f'tvg-id="{epg_id}"', - f'tvg-logo="{logo_url}"', - f'group-title="{group_name}"', - f'tvg-name="{stream_name}"' - ] - - extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}" - m3u_lines.append(extinf_line) - m3u_lines.append(stream_url) - valid_channels += 1 - - country_stats[group_name] = country_stats.get(group_name, 0) + 1 - + """Main playlist generation function - coordinates all modules.""" try: - with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: - for line in m3u_lines: - f.write(line + '\n') - log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO") + setup_logging() + logging.info("Starting modular playlist generation...") - sorted_stats = dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True)) - log_message(f"Channels by country: {sorted_stats}", "INFO") + # Initialize all modules + config = ConfigManager() + file_manager = FileManager(config) + processor = ChannelProcessor(config) + builder = PlaylistBuilder(config) + health_checker = HealthChecker(config) + report_gen = ReportGenerator(config) + + # Clear log file + if os.path.exists('playlist_update.log'): + open('playlist_update.log', 'w').close() + + # Statistics tracking + stats = { + 'total_channels': 0, + 'valid_channels': 0, + 'duplicates_removed': 0, + 'imported_channels': 0, + 'countries_detected': 0, + 'country_distribution': {} + } + + # Step 1: Create backup + file_manager.create_backup('channels.txt') + + # Step 2: Clean existing corrupted entries + processor.clean_corrupted_channels() + + # Step 3: Force update existing channels with new country detection + processor.update_existing_channels_with_country_detection() + + # Step 4: Process imports + imported_channels = processor.process_import() + stats['imported_channels'] = len(imported_channels) + logging.info(f"Import returned {len(imported_channels)} channels") + + # Step 5: Load all channels + all_channels = file_manager.load_all_channels() + stats['total_channels'] = len(all_channels) + + # Step 6: Remove duplicates + unique_channels = processor.remove_duplicates_optimized(all_channels) + stats['duplicates_removed'] = len(all_channels) - len(unique_channels) + + # Step 7: Sort channels + if config.settings.get('sort_channels', True): + unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) + + # Step 8: Health check (optional) + health_results = {} + if config.settings.get('enable_health_check', False): + health_results = health_checker.batch_health_check(unique_channels) + + # Step 9: Generate M3U playlist + valid_channels, country_stats = builder.generate_m3u(unique_channels) + stats['valid_channels'] = valid_channels + stats['country_distribution'] = country_stats + stats['countries_detected'] = len(country_stats) + + # Step 10: Generate report + report_gen.save_report(stats, health_results) + + logging.info(f"Playlist generation complete: {valid_channels} channels across {len(country_stats)} countries") + logging.info(f"Top countries: {dict(list(sorted(country_stats.items(), key=lambda x: x[1], reverse=True))[:5])}") + + return True except Exception as e: - log_message(f"Error writing playlist: {e}", "ERROR") - - log_message("Comprehensive playlist generation complete", "INFO") + logging.error(f"Fatal error in playlist generation: {e}") + return False if __name__ == "__main__": - generate_playlist() \ No newline at end of file + success = generate_playlist() + exit(0 if success else 1) \ No newline at end of file