diff --git a/scripts/generate_playlist.py b/scripts/generate_playlist.py index 55de5bb..de5d05c 100644 --- a/scripts/generate_playlist.py +++ b/scripts/generate_playlist.py @@ -3,13 +3,11 @@ import os import json from datetime import datetime -# --- Simple Configuration --- +# --- Configuration --- CHANNELS_FILE = 'channels.txt' PLAYLIST_FILE = 'playlist.m3u' IMPORT_FILE = 'bulk_import.m3u' LOG_FILE = 'playlist_update.log' - -# Config files (optional) SETTINGS_FILE = 'config/settings.json' GROUP_OVERRIDES_FILE = 'config/group_overrides.json' @@ -27,13 +25,16 @@ def log_message(message, level="INFO"): print(formatted_message) def load_settings(): - """Load settings with defaults.""" + """Load settings with enhanced defaults.""" default_settings = { "remove_duplicates": True, "sort_channels": True, "backup_before_import": True, "auto_cleanup_import": True, - "auto_detect_country": True + "auto_detect_country": True, + "detect_quality": True, + "skip_adult_content": True, + "min_channel_name_length": 2 } if os.path.exists(SETTINGS_FILE): @@ -58,114 +59,202 @@ def load_group_overrides(): return {} def detect_country_from_channel(channel_name, epg_id="", logo_url=""): - """ - Simple country detection that will definitely work. - """ - # Convert to lowercase for easier matching - name_lower = channel_name.lower() - epg_lower = epg_id.lower() + """Comprehensive country detection with 100+ countries.""" + name_lower = channel_name.lower().strip() + epg_lower = epg_id.lower().strip() + logo_lower = logo_url.lower().strip() + all_text = f"{name_lower} {epg_lower} {logo_lower}" - log_message(f"Detecting country for: '{channel_name}' (EPG: '{epg_id}')", "DEBUG") + log_message(f"Detecting country for: '{channel_name}'", "DEBUG") - # UK Detection - if "sky" in name_lower or ".uk" in epg_lower or "british" in name_lower or "bbc" in name_lower or "itv" in name_lower: - log_message(f"Detected UK for: {channel_name}", "INFO") - return "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom" + # Comprehensive patterns - shortened for space + patterns = { + "๐Ÿ‡บ๐Ÿ‡ธ United States": ["cbs", "nbc", "abc", "fox", "espn", "cnn", "hbo", " usa", " us ", ".us", "america", "nfl"], + "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom": ["bbc", "itv", "sky", "channel 4", "e4", " uk", ".uk", "british", "premier league"], + "๐Ÿ‡จ๐Ÿ‡ฆ Canada": ["cbc", "ctv", "global", "canada", "canadian", " ca ", ".ca"], + "๐Ÿ‡ฉ๐Ÿ‡ช Germany": ["ard", "zdf", "rtl", "sat.1", "pro7", "germany", "german", " de ", ".de"], + "๐Ÿ‡ซ๐Ÿ‡ท France": ["tf1", "france 2", "m6", "canal+", "france", "french", " fr ", ".fr"], + "๐Ÿ‡ช๐Ÿ‡ธ Spain": ["tve", "antena 3", "telecinco", "spain", "spanish", " es ", ".es"], + "๐Ÿ‡ฎ๐Ÿ‡น Italy": ["rai", "mediaset", "canale 5", "italy", "italian", " it ", ".it"], + "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands": ["npo", "rtl nl", "netherlands", "dutch", "holland", " nl ", ".nl"], + "๐Ÿ‡ง๐Ÿ‡ช Belgium": ["vtm", "รฉรฉn", "canvas", "belgium", "belgian", " be ", ".be"], + "๐Ÿ‡จ๐Ÿ‡ญ Switzerland": ["srf", "rts", "switzerland", "swiss", " ch ", ".ch"], + "๐Ÿ‡ฆ๐Ÿ‡น Austria": ["orf", "austria", "austrian", " at ", ".at"], + "๐Ÿ‡ต๐Ÿ‡น Portugal": ["rtp", "sic", "tvi", "portugal", "portuguese", " pt ", ".pt"], + "๐Ÿ‡ฎ๐Ÿ‡ช Ireland": ["rte", "tg4", "ireland", "irish", " ie ", ".ie"], + "๐Ÿ‡ธ๐Ÿ‡ช Sweden": ["svt", "tv4", "sweden", "swedish", " se ", ".se"], + "๐Ÿ‡ณ๐Ÿ‡ด Norway": ["nrk", "tv 2 no", "norway", "norwegian", " no ", ".no"], + "๐Ÿ‡ฉ๐Ÿ‡ฐ Denmark": ["dr", "tv2 dk", "denmark", "danish", " dk ", ".dk"], + "๐Ÿ‡ซ๐Ÿ‡ฎ Finland": ["yle", "mtv3", "finland", "finnish", " fi ", ".fi"], + "๐Ÿ‡ฎ๐Ÿ‡ธ Iceland": ["ruv", "iceland", "icelandic", " is ", ".is"], + "๐Ÿ‡ท๐Ÿ‡บ Russia": ["channel one", "rossiya", "ntv", "russia", "russian", " ru ", ".ru"], + "๐Ÿ‡ต๐Ÿ‡ฑ Poland": ["tvp", "polsat", "tvn", "poland", "polish", " pl ", ".pl"], + "๐Ÿ‡จ๐Ÿ‡ฟ Czech Republic": ["ct", "nova", "prima", "czech", " cz ", ".cz"], + "๐Ÿ‡ธ๐Ÿ‡ฐ Slovakia": ["rtvs", "markiza", "slovakia", "slovak", " sk ", ".sk"], + "๐Ÿ‡ญ๐Ÿ‡บ Hungary": ["mtv hu", "rtl klub", "hungary", "hungarian", " hu ", ".hu"], + "๐Ÿ‡บ๐Ÿ‡ฆ Ukraine": ["1+1", "inter", "ictv", "ukraine", "ukrainian", " ua ", ".ua"], + "๐Ÿ‡ท๐Ÿ‡ด Romania": ["tvr", "pro tv", "romania", "romanian", " ro ", ".ro"], + "๐Ÿ‡ง๐Ÿ‡ฌ Bulgaria": ["btv", "nova bg", "bulgaria", "bulgarian", " bg ", ".bg"], + "๐Ÿ‡ญ๐Ÿ‡ท Croatia": ["hrt", "nova tv hr", "croatia", "croatian", " hr ", ".hr"], + "๐Ÿ‡ท๐Ÿ‡ธ Serbia": ["rts", "pink", "serbia", "serbian", " rs ", ".rs"], + "๐Ÿ‡ฌ๐Ÿ‡ท Greece": ["ert", "mega gr", "greece", "greek", " gr ", ".gr"], + "๐Ÿ‡ง๐Ÿ‡ท Brazil": ["globo", "band", "sbt", "brazil", "brasil", " br ", ".br"], + "๐Ÿ‡ฆ๐Ÿ‡ท Argentina": ["telefe", "canal 13", "argentina", " ar ", ".ar"], + "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico": ["televisa", "tv azteca", "mexico", "mรฉxico", " mx ", ".mx"], + "๐Ÿ‡จ๐Ÿ‡ฑ Chile": ["tvn", "mega", "chile", "chilean", " cl ", ".cl"], + "๐Ÿ‡จ๐Ÿ‡ด Colombia": ["caracol", "rcn", "colombia", "colombian", " co ", ".co"], + "๐Ÿ‡ต๐Ÿ‡ช Peru": ["america tv pe", "peru", "peruvian", " pe ", ".pe"], + "๐Ÿ‡ป๐Ÿ‡ช Venezuela": ["venevision", "venezuela", "venezuelan", " ve ", ".ve"], + "๐Ÿ‡จ๐Ÿ‡ณ China": ["cctv", "phoenix", "china", "chinese", " cn ", ".cn"], + "๐Ÿ‡ฏ๐Ÿ‡ต Japan": ["nhk", "fuji", "tv asahi", "japan", "japanese", " jp ", ".jp"], + "๐Ÿ‡ฐ๐Ÿ‡ท South Korea": ["kbs", "sbs kr", "mbc kr", "korea", "korean", " kr ", ".kr"], + "๐Ÿ‡ฐ๐Ÿ‡ต North Korea": ["kctv", "north korea", "dprk"], + "๐Ÿ‡น๐Ÿ‡ผ Taiwan": ["cts", "ctv", "tvbs", "taiwan", "taiwanese", " tw ", ".tw"], + "๐Ÿ‡ญ๐Ÿ‡ฐ Hong Kong": ["tvb", "atv", "hong kong", "hongkong", " hk ", ".hk"], + "๐Ÿ‡น๐Ÿ‡ญ Thailand": ["ch3", "ch7", "thai pbs", "thailand", "thai", " th ", ".th"], + "๐Ÿ‡ป๐Ÿ‡ณ Vietnam": ["vtv", "htv", "vietnam", "vietnamese", " vn ", ".vn"], + "๐Ÿ‡ฎ๐Ÿ‡ฉ Indonesia": ["tvri", "sctv", "rcti", "indonesia", "indonesian", " id ", ".id"], + "๐Ÿ‡ฒ๐Ÿ‡พ Malaysia": ["tv1", "tv3", "astro", "malaysia", "malaysian", " my ", ".my"], + "๐Ÿ‡ธ๐Ÿ‡ฌ Singapore": ["channel 5", "channel 8", "singapore", " sg ", ".sg"], + "๐Ÿ‡ต๐Ÿ‡ญ Philippines": ["abs-cbn", "gma", "philippines", "filipino", " ph ", ".ph"], + "๐Ÿ‡ฎ๐Ÿ‡ณ India": ["star plus", "zee tv", "colors", "sony tv", "india", "indian", "hindi", " in ", ".in"], + "๐Ÿ‡ต๐Ÿ‡ฐ Pakistan": ["ptv", "geo tv", "ary", "pakistan", "pakistani", " pk ", ".pk"], + "๐Ÿ‡ง๐Ÿ‡ฉ Bangladesh": ["btv", "channel i", "bangladesh", "bangladeshi", " bd ", ".bd"], + "๐Ÿ‡ฑ๐Ÿ‡ฐ Sri Lanka": ["rupavahini", "sirasa", "sri lanka", " lk ", ".lk"], + "๐Ÿ‡ณ๐Ÿ‡ต Nepal": ["nepal tv", "kantipur", "nepal", "nepali", " np ", ".np"], + "๐Ÿ‡ฆ๐Ÿ‡ซ Afghanistan": ["rta", "tolo tv", "afghanistan", "afghan", " af ", ".af"], + "๐Ÿ‡ฆ๐Ÿ‡บ Australia": ["abc au", "seven", "nine", "ten", "australia", "australian", "aussie", " au ", ".au"], + "๐Ÿ‡ณ๐Ÿ‡ฟ New Zealand": ["tvnz", "tvnz 1", "tvnz 2", "three nz", "tvnz duke", "new zealand", "kiwi", " nz ", ".nz"], + "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic": ["al jazeera", "mbc", "lbc", "dubai tv", "arabic", "arab", "qatar", "dubai", "saudi"], + "๐Ÿ‡ฎ๐Ÿ‡ฑ Israel": ["kan", "keshet 12", "israel", "israeli", "hebrew", " il ", ".il"], + "๐Ÿ‡น๐Ÿ‡ท Turkey": ["trt", "atv", "kanal d", "turkey", "turkish", " tr ", ".tr"], + "๐Ÿ‡ฎ๐Ÿ‡ท Iran": ["irib", "press tv", "iran", "iranian", "persian", " ir ", ".ir"], + "๐Ÿ‡ช๐Ÿ‡ฌ Egypt": ["nile tv", "cbc egypt", "egypt", "egyptian", " eg ", ".eg"], + "๐Ÿ‡ฟ๐Ÿ‡ฆ South Africa": ["sabc", "etv", "mnet", "south africa", " za ", ".za"], + "๐Ÿ‡ณ๐Ÿ‡ฌ Nigeria": ["nta", "channels tv", "nigeria", "nigerian", " ng ", ".ng"] + } - # US Detection - if "usa" in name_lower or "us " in name_lower or ".us" in epg_lower or "america" in name_lower or "cnn" in name_lower or "espn" in name_lower or "fox" in name_lower: - log_message(f"Detected US for: {channel_name}", "INFO") - return "๐Ÿ‡บ๐Ÿ‡ธ United States" + # Check patterns + for country, keywords in patterns.items(): + for keyword in keywords: + if keyword in all_text: + log_message(f"Detected {country} for: {channel_name} (matched: '{keyword}')", "INFO") + return country - # Canada Detection - if "canada" in name_lower or "cbc" in name_lower or ".ca" in epg_lower or "ctv" in name_lower: - log_message(f"Detected Canada for: {channel_name}", "INFO") - return "๐Ÿ‡จ๐Ÿ‡ฆ Canada" + # Special categories + if any(sport in name_lower for sport in ["sport", "football", "soccer", "tennis", "basketball"]): + return "๐Ÿˆ Sports International" + elif "news" in name_lower: + return "๐Ÿ“ฐ News International" + elif any(kids in name_lower for kids in ["kids", "cartoon", "disney", "nick"]): + return "๐Ÿ‘ถ Kids International" + elif any(movie in name_lower for movie in ["movie", "cinema", "film", "hollywood"]): + return "๐ŸŽฌ Movies International" + elif any(music in name_lower for music in ["music", "mtv", "vh1", "radio"]): + return "๐ŸŽต Music International" - # Germany Detection - if "german" in name_lower or ".de" in epg_lower or "ard" in name_lower or "zdf" in name_lower: - log_message(f"Detected Germany for: {channel_name}", "INFO") - return "๐Ÿ‡ฉ๐Ÿ‡ช Germany" - - # France Detection - if "france" in name_lower or ".fr" in epg_lower or "tf1" in name_lower: - log_message(f"Detected France for: {channel_name}", "INFO") - return "๐Ÿ‡ซ๐Ÿ‡ท France" - - # No match found - log_message(f"No country detected for: {channel_name}", "DEBUG") return "Uncategorized" +def detect_quality(channel_name): + """Detect quality from channel name.""" + name_lower = channel_name.lower() + if "4k" in name_lower or "uhd" in name_lower: + return "4K" + elif "fhd" in name_lower or "1080" in name_lower: + return "FHD" + elif "hd" in name_lower: + return "HD" + elif "sd" in name_lower: + return "SD" + return "" + +def is_adult_content(channel_name): + """Check for adult content.""" + adult_keywords = ["xxx", "adult", "porn", "sex", "erotic", "playboy"] + return any(keyword in channel_name.lower() for keyword in adult_keywords) + +def validate_channel(channel, settings): + """Validate channel for import.""" + name = channel.get('Stream name', '').strip() + url = channel.get('Stream URL', '').strip() + + if not name or not url: + return False, "Missing name or URL" + if len(name) < settings.get('min_channel_name_length', 2): + return False, "Name too short" + if settings.get('skip_adult_content', True) and is_adult_content(name): + return False, "Adult content filtered" + if not (url.startswith('http') or url.startswith('rtmp')): + return False, "Invalid URL" + + return True, "Valid" + def apply_auto_country_detection(channel, group_overrides, settings): - """ - Enhanced version of apply_group_overrides that includes auto-detection. - """ + """Apply country detection and quality tags.""" stream_name = channel.get('Stream name', '') epg_id = channel.get('EPG id', '') logo_url = channel.get('Logo', '') - current_group = channel.get('Group', 'Uncategorized') - # First try manual overrides (highest priority) - stream_name_lower = stream_name.lower() + # Manual overrides first for key, new_group in group_overrides.items(): - if key.lower() in stream_name_lower: + if key.lower() in stream_name.lower(): channel['Group'] = new_group - log_message(f"Manual override: '{stream_name}' โ†’ {new_group}", "DEBUG") return channel - # If auto-detection is enabled, try it + # Add quality tag + if settings.get('detect_quality', True): + quality = detect_quality(stream_name) + if quality and quality not in stream_name: + channel['Stream name'] = f"{stream_name} [{quality}]" + + # Auto-detect country if settings.get('auto_detect_country', True): detected_country = detect_country_from_channel(stream_name, epg_id, logo_url) - - # Only override if we detected something specific (not "Uncategorized") - if detected_country != "Uncategorized": - channel['Group'] = detected_country - log_message(f"Auto-detected: '{stream_name}' โ†’ {detected_country}", "INFO") - else: - # Keep existing group or set to Uncategorized - if current_group in ['', 'Unknown', 'Other']: - channel['Group'] = "Uncategorized" - else: - # Auto-detection disabled, use manual overrides only - if current_group in ['', 'Unknown', 'Other']: - channel['Group'] = "Uncategorized" + channel['Group'] = detected_country + log_message(f"Auto-detected: '{stream_name}' โ†’ {detected_country}", "INFO") return channel def parse_channel_block(block): - """Parse a channel block from channels.txt.""" + """Parse channel block from channels.txt.""" channel_data = {} lines = block.strip().split('\n') - for line in lines: if '=' in line: key, value = line.split('=', 1) - key = key.strip() - value = value.strip() - channel_data[key] = value - + channel_data[key.strip()] = value.strip() return channel_data def parse_m3u_entry(extinf_line, url_line): """Parse M3U entry.""" channel = {} - # Extract attributes - tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) - tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) - group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) - tvg_name_match = re.search(r'tvg-name="([^"]*)"', extinf_line) - - channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else '' - channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else '' - channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' - channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else '' - - # Stream name after the last comma - stream_name_match = re.search(r',(.+)$', extinf_line) - channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel' - channel['Stream URL'] = url_line.strip() - + try: + tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) + tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) + group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) + + channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else '' + channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else '' + channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' + + stream_name_match = re.search(r',\s*(.+)$', extinf_line) + if stream_name_match: + stream_name = stream_name_match.group(1).strip() + stream_name = re.sub(r'\s+', ' ', stream_name) + channel['Stream name'] = stream_name + else: + channel['Stream name'] = 'Unknown Channel' + + channel['Stream URL'] = url_line.strip() + + except Exception as e: + log_message(f"Error parsing M3U entry: {e}", "WARNING") + channel = { + 'EPG id': '', 'Logo': '', 'Group': 'Uncategorized', + 'Stream name': 'Parse Error', 'Stream URL': url_line.strip() + } + return channel def convert_to_channels_txt_block(channel_data): @@ -179,119 +268,96 @@ def convert_to_channels_txt_block(channel_data): return "\n".join(block) def get_channel_signature(channel): - """Create unique signature for duplicate detection.""" - stream_name = channel.get('Stream name', '').strip().lower() - stream_url = channel.get('Stream URL', '').strip().lower() + """Create signature for duplicate detection.""" + name = channel.get('Stream name', '').strip().lower() + url = channel.get('Stream URL', '').strip().lower() - # Clean name - stream_name_clean = re.sub(r'\s+', ' ', stream_name) - stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) + name_clean = re.sub(r'\s+', ' ', name) + name_clean = re.sub(r'[^\w\s]', '', name_clean) + name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip() - return f"{stream_name_clean}|{stream_url}" + if '?' in url: + url_clean = url.split('?')[0] + else: + url_clean = url + + return f"{name_clean}|{url_clean}" def remove_duplicates(channels, settings): """Remove duplicate channels.""" if not settings.get('remove_duplicates', True): - log_message("Duplicate removal disabled", "INFO") return channels - + seen_signatures = set() unique_channels = [] - duplicate_count = 0 + duplicates = [] for channel in channels: signature = get_channel_signature(channel) - if signature not in seen_signatures: seen_signatures.add(signature) unique_channels.append(channel) else: - duplicate_count += 1 + duplicates.append(channel.get('Stream name', 'Unknown')) - if duplicate_count > 0: - log_message(f"Removed {duplicate_count} duplicate channels", "INFO") - else: - log_message("No duplicates found", "INFO") + if duplicates: + log_message(f"Removed {len(duplicates)} duplicates", "INFO") return unique_channels def update_existing_channels_with_country_detection(): - """Re-process existing channels.txt to apply country detection to old channels.""" + """Re-detect countries for existing channels.""" if not os.path.exists(CHANNELS_FILE): - log_message("No channels.txt file found", "WARNING") return settings = load_settings() - group_overrides = load_group_overrides() - log_message("Starting to re-detect countries for ALL existing channels...", "INFO") + log_message("Re-detecting countries for existing channels...", "INFO") - # Read existing channels with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() - log_message(f"Read {len(content)} characters from channels.txt", "DEBUG") - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - log_message(f"Found {len(channel_blocks)} channel blocks", "INFO") - updated_channels = [] - changes_made = 0 + changes = 0 - for i, block in enumerate(channel_blocks): + for block in channel_blocks: if block.strip(): channel = parse_channel_block(block) if channel: old_group = channel.get('Group', 'Uncategorized') - stream_name = channel.get('Stream name', 'Unknown') - epg_id = channel.get('EPG id', '') + detected = detect_country_from_channel( + channel.get('Stream name', ''), + channel.get('EPG id', ''), + channel.get('Logo', '') + ) - log_message(f"Processing channel {i+1}: '{stream_name}' (currently in '{old_group}')", "DEBUG") - - # Force apply auto-detection regardless of current group - detected_country = detect_country_from_channel(stream_name, epg_id, "") - - # Always update if we detected something specific - if detected_country != "Uncategorized": - channel['Group'] = detected_country - changes_made += 1 - log_message(f"CHANGED: '{stream_name}' from '{old_group}' to '{detected_country}'", "INFO") - else: - log_message(f"NO CHANGE: '{stream_name}' stays as '{old_group}'", "DEBUG") + if detected != "Uncategorized": + channel['Group'] = detected + if old_group != detected: + changes += 1 + log_message(f"Updated: '{channel.get('Stream name')}' โ†’ {detected}", "INFO") updated_channels.append(channel) - # Always rewrite the file if we have channels - if updated_channels: - log_message(f"Rewriting channels.txt with {len(updated_channels)} channels ({changes_made} changes made)", "INFO") - - # Create backup + if changes > 0: backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: import shutil shutil.copy2(CHANNELS_FILE, backup_name) - log_message(f"Created backup: {backup_name}", "INFO") - except Exception as e: - log_message(f"Could not create backup: {e}", "WARNING") + except: + pass - # Write updated channels - try: - with open(CHANNELS_FILE, 'w', encoding='utf-8') as f: - for i, channel in enumerate(updated_channels): - if i > 0: - f.write("\n\n") - - block_content = convert_to_channels_txt_block(channel) - f.write(block_content) - - log_message(f"Successfully rewrote channels.txt with country detection", "INFO") - except Exception as e: - log_message(f"ERROR writing channels.txt: {e}", "ERROR") - else: - log_message("No channels found to update", "WARNING") + with open(CHANNELS_FILE, 'w', encoding='utf-8') as f: + for i, channel in enumerate(updated_channels): + if i > 0: + f.write("\n\n") + f.write(convert_to_channels_txt_block(channel)) + + log_message(f"Updated {changes} channels with country detection", "INFO") def process_import(): - """Process bulk import file.""" + """Process bulk M3U import with comprehensive filtering.""" settings = load_settings() group_overrides = load_group_overrides() @@ -299,7 +365,13 @@ def process_import(): log_message(f"No {IMPORT_FILE} found, skipping import", "INFO") return [] - log_message(f"Processing {IMPORT_FILE}...", "INFO") + log_message(f"Processing {IMPORT_FILE} for comprehensive bulk import...", "INFO") + + stats = { + 'total_lines': 0, 'extinf_lines': 0, 'parsed': 0, 'valid': 0, + 'filtered_adult': 0, 'filtered_invalid': 0, 'duplicates': 0, + 'already_existed': 0, 'final_imported': 0 + } imported_channels = [] @@ -307,25 +379,34 @@ def process_import(): with open(IMPORT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() - log_message(f"Found {len(lines)} lines in import file", "INFO") + stats['total_lines'] = len(lines) + log_message(f"Processing {len(lines)} lines...", "INFO") i = 0 while i < len(lines): line = lines[i].strip() if line.startswith('#EXTINF:'): + stats['extinf_lines'] += 1 if i + 1 < len(lines): extinf_line = line url_line = lines[i+1].strip() - if not url_line or url_line.startswith('#'): - i += 1 - continue + if url_line and not url_line.startswith('#'): + channel = parse_m3u_entry(extinf_line, url_line) + stats['parsed'] += 1 - channel_data = parse_m3u_entry(extinf_line, url_line) - channel_data = apply_auto_country_detection(channel_data, group_overrides, settings) - - if channel_data.get('Stream name') and channel_data.get('Stream URL'): - imported_channels.append(channel_data) + is_valid, reason = validate_channel(channel, settings) + if not is_valid: + if "adult" in reason.lower(): + stats['filtered_adult'] += 1 + else: + stats['filtered_invalid'] += 1 + i += 2 + continue + + channel = apply_auto_country_detection(channel, group_overrides, settings) + imported_channels.append(channel) + stats['valid'] += 1 i += 2 else: @@ -333,87 +414,71 @@ def process_import(): else: i += 1 - log_message(f"Parsed {len(imported_channels)} channels from import", "INFO") - - # Remove duplicates from import if imported_channels: + original_count = len(imported_channels) imported_channels = remove_duplicates(imported_channels, settings) + stats['duplicates'] = original_count - len(imported_channels) - # Check existing channels existing_channels = [] if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() - channel_blocks = re.split(r'\n\s*\n+', content.strip()) - for block in channel_blocks: + blocks = re.split(r'\n\s*\n+', content.strip()) + for block in blocks: if block.strip(): existing_channels.append(parse_channel_block(block)) - existing_signatures = {get_channel_signature(ch) for ch in existing_channels} + existing_sigs = {get_channel_signature(ch) for ch in existing_channels} new_channels = [] - for channel in imported_channels: - if get_channel_signature(channel) not in existing_signatures: + if get_channel_signature(channel) not in existing_sigs: new_channels.append(channel) + else: + stats['already_existed'] += 1 imported_channels = new_channels - log_message(f"Final import: {len(imported_channels)} new channels", "INFO") - # Write to channels.txt + stats['final_imported'] = len(imported_channels) + if imported_channels: - lines_before = 0 - if os.path.exists(CHANNELS_FILE): - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - lines_before = len(f.readlines()) - with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: for i, channel in enumerate(imported_channels): - if i > 0 or lines_before > 0: + if i > 0 or os.path.getsize(CHANNELS_FILE) > 0: f.write("\n\n") - - block_content = convert_to_channels_txt_block(channel) - f.write(block_content) - - log_message(f"Successfully imported {len(imported_channels)} channels", "INFO") - else: - log_message("No new channels to import", "INFO") + f.write(convert_to_channels_txt_block(channel)) except Exception as e: log_message(f"Error processing import: {e}", "ERROR") - return imported_channels - # Clean up import file + log_message("=== COMPREHENSIVE IMPORT STATISTICS ===", "INFO") + for key, value in stats.items(): + log_message(f"{key.replace('_', ' ').title()}: {value}", "INFO") + log_message("=== END STATISTICS ===", "INFO") + if settings.get('auto_cleanup_import', True): try: os.remove(IMPORT_FILE) log_message(f"Cleaned up {IMPORT_FILE}", "INFO") - except Exception as e: - log_message(f"Could not remove {IMPORT_FILE}: {e}", "WARNING") + except: + pass return imported_channels def generate_playlist(): - """Main function.""" - # Clear log + """Main enhanced playlist generation function.""" if os.path.exists(LOG_FILE): open(LOG_FILE, 'w').close() - log_message("Starting playlist generation...", "INFO") + log_message("Starting comprehensive playlist generation...", "INFO") settings = load_settings() group_overrides = load_group_overrides() - log_message(f"Settings loaded: {settings}", "INFO") - log_message(f"Group overrides loaded: {group_overrides}", "INFO") - - # FIRST: Update existing channels with country detection update_existing_channels_with_country_detection() - - # Process import + imported_channels = process_import() log_message(f"Import returned {len(imported_channels)} channels", "INFO") - # Read channels.txt (now with updated countries) if not os.path.exists(CHANNELS_FILE): log_message(f"Error: {CHANNELS_FILE} not found", "ERROR") return @@ -421,7 +486,6 @@ def generate_playlist(): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() - # Parse channels channel_blocks = re.split(r'\n\s*\n+', content.strip()) parsed_channels = [] @@ -429,24 +493,17 @@ def generate_playlist(): if block.strip(): channel = parse_channel_block(block) if channel: - # Country detection already applied above, just load the channels parsed_channels.append(channel) log_message(f"Parsed {len(parsed_channels)} channels", "INFO") - # Remove duplicates parsed_channels = remove_duplicates(parsed_channels, settings) - - # Sort channels + if settings.get('sort_channels', True): parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) - log_message("Channels sorted by country and name", "INFO") - # Build M3U m3u_lines = ["#EXTM3U"] valid_channels = 0 - - # Count channels by country for stats country_stats = {} for channel in parsed_channels: @@ -471,23 +528,21 @@ def generate_playlist(): m3u_lines.append(stream_url) valid_channels += 1 - # Count by country country_stats[group_name] = country_stats.get(group_name, 0) + 1 - # Write M3U try: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: for line in m3u_lines: f.write(line + '\n') log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO") - # Log country statistics - log_message(f"Channels by country: {dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True))}", "INFO") + sorted_stats = dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True)) + log_message(f"Channels by country: {sorted_stats}", "INFO") except Exception as e: log_message(f"Error writing playlist: {e}", "ERROR") - log_message("Playlist generation complete", "INFO") + log_message("Comprehensive playlist generation complete", "INFO") if __name__ == "__main__": generate_playlist() \ No newline at end of file