import re import os import json from datetime import datetime # --- Configuration --- CHANNELS_FILE = 'channels.txt' PLAYLIST_FILE = 'playlist.m3u' IMPORT_FILE = 'bulk_import.m3u' LOG_FILE = 'playlist_update.log' SETTINGS_FILE = 'config/settings.json' GROUP_OVERRIDES_FILE = 'config/group_overrides.json' def log_message(message, level="INFO"): """Logs messages to file and prints them.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted_message = f"[{timestamp}] {level}: {message}" try: with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(formatted_message + "\n") except Exception as e: print(f"ERROR: Could not write to log: {e}") print(formatted_message) def load_settings(): """Load settings with enhanced defaults.""" default_settings = { "remove_duplicates": True, "sort_channels": True, "backup_before_import": True, "auto_cleanup_import": True, "auto_detect_country": True, "detect_quality": True, "skip_adult_content": True, "min_channel_name_length": 2 } if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, 'r', encoding='utf-8') as f: settings = json.load(f) return {**default_settings, **settings} except Exception as e: log_message(f"Could not load settings, using defaults: {e}", "WARNING") return default_settings def load_group_overrides(): """Load group overrides.""" if os.path.exists(GROUP_OVERRIDES_FILE): try: with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: log_message(f"Could not load group overrides: {e}", "WARNING") return {} def detect_country_from_channel(channel_name, epg_id="", logo_url=""): """Comprehensive country detection with 100+ countries.""" name_lower = channel_name.lower().strip() epg_lower = epg_id.lower().strip() logo_lower = logo_url.lower().strip() all_text = f"{name_lower} {epg_lower} {logo_lower}" log_message(f"Detecting country for: '{channel_name}'", "DEBUG") # Comprehensive patterns - shortened for space patterns = { "๐Ÿ‡บ๐Ÿ‡ธ United States": ["cbs", "nbc", "abc", "fox", "espn", "cnn", "hbo", " usa", " us ", ".us", "america", "nfl"], "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom": ["bbc", "itv", "sky", "channel 4", "e4", " uk", ".uk", "british", "premier league"], "๐Ÿ‡จ๐Ÿ‡ฆ Canada": ["cbc", "ctv", "global", "canada", "canadian", " ca ", ".ca"], "๐Ÿ‡ฉ๐Ÿ‡ช Germany": ["ard", "zdf", "rtl", "sat.1", "pro7", "germany", "german", " de ", ".de"], "๐Ÿ‡ซ๐Ÿ‡ท France": ["tf1", "france 2", "m6", "canal+", "france", "french", " fr ", ".fr"], "๐Ÿ‡ช๐Ÿ‡ธ Spain": ["tve", "antena 3", "telecinco", "spain", "spanish", " es ", ".es"], "๐Ÿ‡ฎ๐Ÿ‡น Italy": ["rai", "mediaset", "canale 5", "italy", "italian", " it ", ".it"], "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands": ["npo", "rtl nl", "netherlands", "dutch", "holland", " nl ", ".nl"], "๐Ÿ‡ง๐Ÿ‡ช Belgium": ["vtm", "รฉรฉn", "canvas", "belgium", "belgian", " be ", ".be"], "๐Ÿ‡จ๐Ÿ‡ญ Switzerland": ["srf", "rts", "switzerland", "swiss", " ch ", ".ch"], "๐Ÿ‡ฆ๐Ÿ‡น Austria": ["orf", "austria", "austrian", " at ", ".at"], "๐Ÿ‡ต๐Ÿ‡น Portugal": ["rtp", "sic", "tvi", "portugal", "portuguese", " pt ", ".pt"], "๐Ÿ‡ฎ๐Ÿ‡ช Ireland": ["rte", "tg4", "ireland", "irish", " ie ", ".ie"], "๐Ÿ‡ธ๐Ÿ‡ช Sweden": ["svt", "tv4", "sweden", "swedish", " se ", ".se"], "๐Ÿ‡ณ๐Ÿ‡ด Norway": ["nrk", "tv 2 no", "norway", "norwegian", " no ", ".no"], "๐Ÿ‡ฉ๐Ÿ‡ฐ Denmark": ["dr", "tv2 dk", "denmark", "danish", " dk ", ".dk"], "๐Ÿ‡ซ๐Ÿ‡ฎ Finland": ["yle", "mtv3", "finland", "finnish", " fi ", ".fi"], "๐Ÿ‡ฎ๐Ÿ‡ธ Iceland": ["ruv", "iceland", "icelandic", " is ", ".is"], "๐Ÿ‡ท๐Ÿ‡บ Russia": ["channel one", "rossiya", "ntv", "russia", "russian", " ru ", ".ru"], "๐Ÿ‡ต๐Ÿ‡ฑ Poland": ["tvp", "polsat", "tvn", "poland", "polish", " pl ", ".pl"], "๐Ÿ‡จ๐Ÿ‡ฟ Czech Republic": ["ct", "nova", "prima", "czech", " cz ", ".cz"], "๐Ÿ‡ธ๐Ÿ‡ฐ Slovakia": ["rtvs", "markiza", "slovakia", "slovak", " sk ", ".sk"], "๐Ÿ‡ญ๐Ÿ‡บ Hungary": ["mtv hu", "rtl klub", "hungary", "hungarian", " hu ", ".hu"], "๐Ÿ‡บ๐Ÿ‡ฆ Ukraine": ["1+1", "inter", "ictv", "ukraine", "ukrainian", " ua ", ".ua"], "๐Ÿ‡ท๐Ÿ‡ด Romania": ["tvr", "pro tv", "romania", "romanian", " ro ", ".ro"], "๐Ÿ‡ง๐Ÿ‡ฌ Bulgaria": ["btv", "nova bg", "bulgaria", "bulgarian", " bg ", ".bg"], "๐Ÿ‡ญ๐Ÿ‡ท Croatia": ["hrt", "nova tv hr", "croatia", "croatian", " hr ", ".hr"], "๐Ÿ‡ท๐Ÿ‡ธ Serbia": ["rts", "pink", "serbia", "serbian", " rs ", ".rs"], "๐Ÿ‡ฌ๐Ÿ‡ท Greece": ["ert", "mega gr", "greece", "greek", " gr ", ".gr"], "๐Ÿ‡ง๐Ÿ‡ท Brazil": ["globo", "band", "sbt", "brazil", "brasil", " br ", ".br"], "๐Ÿ‡ฆ๐Ÿ‡ท Argentina": ["telefe", "canal 13", "argentina", " ar ", ".ar"], "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico": ["televisa", "tv azteca", "mexico", "mรฉxico", " mx ", ".mx"], "๐Ÿ‡จ๐Ÿ‡ฑ Chile": ["tvn", "mega", "chile", "chilean", " cl ", ".cl"], "๐Ÿ‡จ๐Ÿ‡ด Colombia": ["caracol", "rcn", "colombia", "colombian", " co ", ".co"], "๐Ÿ‡ต๐Ÿ‡ช Peru": ["america tv pe", "peru", "peruvian", " pe ", ".pe"], "๐Ÿ‡ป๐Ÿ‡ช Venezuela": ["venevision", "venezuela", "venezuelan", " ve ", ".ve"], "๐Ÿ‡จ๐Ÿ‡ณ China": ["cctv", "phoenix", "china", "chinese", " cn ", ".cn"], "๐Ÿ‡ฏ๐Ÿ‡ต Japan": ["nhk", "fuji", "tv asahi", "japan", "japanese", " jp ", ".jp"], "๐Ÿ‡ฐ๐Ÿ‡ท South Korea": ["kbs", "sbs kr", "mbc kr", "korea", "korean", " kr ", ".kr"], "๐Ÿ‡ฐ๐Ÿ‡ต North Korea": ["kctv", "north korea", "dprk"], "๐Ÿ‡น๐Ÿ‡ผ Taiwan": ["cts", "ctv", "tvbs", "taiwan", "taiwanese", " tw ", ".tw"], "๐Ÿ‡ญ๐Ÿ‡ฐ Hong Kong": ["tvb", "atv", "hong kong", "hongkong", " hk ", ".hk"], "๐Ÿ‡น๐Ÿ‡ญ Thailand": ["ch3", "ch7", "thai pbs", "thailand", "thai", " th ", ".th"], "๐Ÿ‡ป๐Ÿ‡ณ Vietnam": ["vtv", "htv", "vietnam", "vietnamese", " vn ", ".vn"], "๐Ÿ‡ฎ๐Ÿ‡ฉ Indonesia": ["tvri", "sctv", "rcti", "indonesia", "indonesian", " id ", ".id"], "๐Ÿ‡ฒ๐Ÿ‡พ Malaysia": ["tv1", "tv3", "astro", "malaysia", "malaysian", " my ", ".my"], "๐Ÿ‡ธ๐Ÿ‡ฌ Singapore": ["channel 5", "channel 8", "singapore", " sg ", ".sg"], "๐Ÿ‡ต๐Ÿ‡ญ Philippines": ["abs-cbn", "gma", "philippines", "filipino", " ph ", ".ph"], "๐Ÿ‡ฎ๐Ÿ‡ณ India": ["star plus", "zee tv", "colors", "sony tv", "india", "indian", "hindi", " in ", ".in"], "๐Ÿ‡ต๐Ÿ‡ฐ Pakistan": ["ptv", "geo tv", "ary", "pakistan", "pakistani", " pk ", ".pk"], "๐Ÿ‡ง๐Ÿ‡ฉ Bangladesh": ["btv", "channel i", "bangladesh", "bangladeshi", " bd ", ".bd"], "๐Ÿ‡ฑ๐Ÿ‡ฐ Sri Lanka": ["rupavahini", "sirasa", "sri lanka", " lk ", ".lk"], "๐Ÿ‡ณ๐Ÿ‡ต Nepal": ["nepal tv", "kantipur", "nepal", "nepali", " np ", ".np"], "๐Ÿ‡ฆ๐Ÿ‡ซ Afghanistan": ["rta", "tolo tv", "afghanistan", "afghan", " af ", ".af"], "๐Ÿ‡ฆ๐Ÿ‡บ Australia": ["abc au", "seven", "nine", "ten", "australia", "australian", "aussie", " au ", ".au"], "๐Ÿ‡ณ๐Ÿ‡ฟ New Zealand": ["tvnz", "tvnz 1", "tvnz 2", "three nz", "tvnz duke", "new zealand", "kiwi", " nz ", ".nz"], "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic": ["al jazeera", "mbc", "lbc", "dubai tv", "arabic", "arab", "qatar", "dubai", "saudi"], "๐Ÿ‡ฎ๐Ÿ‡ฑ Israel": ["kan", "keshet 12", "israel", "israeli", "hebrew", " il ", ".il"], "๐Ÿ‡น๐Ÿ‡ท Turkey": ["trt", "atv", "kanal d", "turkey", "turkish", " tr ", ".tr"], "๐Ÿ‡ฎ๐Ÿ‡ท Iran": ["irib", "press tv", "iran", "iranian", "persian", " ir ", ".ir"], "๐Ÿ‡ช๐Ÿ‡ฌ Egypt": ["nile tv", "cbc egypt", "egypt", "egyptian", " eg ", ".eg"], "๐Ÿ‡ฟ๐Ÿ‡ฆ South Africa": ["sabc", "etv", "mnet", "south africa", " za ", ".za"], "๐Ÿ‡ณ๐Ÿ‡ฌ Nigeria": ["nta", "channels tv", "nigeria", "nigerian", " ng ", ".ng"] } # Check patterns for country, keywords in patterns.items(): for keyword in keywords: if keyword in all_text: log_message(f"Detected {country} for: {channel_name} (matched: '{keyword}')", "INFO") return country # No special categories - everything unmatched goes to Uncategorized log_message(f"No country detected for: {channel_name}", "DEBUG") return "Uncategorized" def detect_quality(channel_name): """Detect quality from channel name.""" name_lower = channel_name.lower() if "4k" in name_lower or "uhd" in name_lower: return "4K" elif "fhd" in name_lower or "1080" in name_lower: return "FHD" elif "hd" in name_lower: return "HD" elif "sd" in name_lower: return "SD" return "" def is_adult_content(channel_name): """Check for adult content.""" adult_keywords = ["xxx", "adult", "porn", "sex", "erotic", "playboy"] return any(keyword in channel_name.lower() for keyword in adult_keywords) def validate_channel(channel, settings): """Validate channel for import.""" name = channel.get('Stream name', '').strip() url = channel.get('Stream URL', '').strip() if not name or not url: return False, "Missing name or URL" if len(name) < settings.get('min_channel_name_length', 2): return False, "Name too short" if settings.get('skip_adult_content', True) and is_adult_content(name): return False, "Adult content filtered" if not (url.startswith('http') or url.startswith('rtmp')): return False, "Invalid URL" return True, "Valid" def apply_auto_country_detection(channel, group_overrides, settings): """Apply country detection and quality tags.""" stream_name = channel.get('Stream name', '') epg_id = channel.get('EPG id', '') logo_url = channel.get('Logo', '') # Manual overrides first for key, new_group in group_overrides.items(): if key.lower() in stream_name.lower(): channel['Group'] = new_group return channel # Add quality tag if settings.get('detect_quality', True): quality = detect_quality(stream_name) if quality and quality not in stream_name: channel['Stream name'] = f"{stream_name} [{quality}]" # Auto-detect country if settings.get('auto_detect_country', True): detected_country = detect_country_from_channel(stream_name, epg_id, logo_url) channel['Group'] = detected_country log_message(f"Auto-detected: '{stream_name}' โ†’ {detected_country}", "INFO") return channel def parse_channel_block(block): """Parse channel block from channels.txt.""" channel_data = {} lines = block.strip().split('\n') for line in lines: if '=' in line: key, value = line.split('=', 1) channel_data[key.strip()] = value.strip() return channel_data def parse_m3u_entry(extinf_line, url_line): """Parse M3U entry.""" channel = {} try: tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else '' channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else '' channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' stream_name_match = re.search(r',\s*(.+)$', extinf_line) if stream_name_match: stream_name = stream_name_match.group(1).strip() stream_name = re.sub(r'\s+', ' ', stream_name) channel['Stream name'] = stream_name else: channel['Stream name'] = 'Unknown Channel' channel['Stream URL'] = url_line.strip() except Exception as e: log_message(f"Error parsing M3U entry: {e}", "WARNING") channel = { 'EPG id': '', 'Logo': '', 'Group': 'Uncategorized', 'Stream name': 'Parse Error', 'Stream URL': url_line.strip() } return channel def convert_to_channels_txt_block(channel_data): """Convert to channels.txt format.""" block = [] block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") block.append(f"Logo = {channel_data.get('Logo', '')}") block.append(f"EPG id = {channel_data.get('EPG id', '')}") block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") return "\n".join(block) def get_channel_signature(channel): """Create signature for duplicate detection.""" name = channel.get('Stream name', '').strip().lower() url = channel.get('Stream URL', '').strip().lower() name_clean = re.sub(r'\s+', ' ', name) name_clean = re.sub(r'[^\w\s]', '', name_clean) name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip() if '?' in url: url_clean = url.split('?')[0] else: url_clean = url return f"{name_clean}|{url_clean}" def remove_duplicates(channels, settings): """Remove duplicate channels.""" if not settings.get('remove_duplicates', True): return channels seen_signatures = set() unique_channels = [] duplicates = [] for channel in channels: signature = get_channel_signature(channel) if signature not in seen_signatures: seen_signatures.add(signature) unique_channels.append(channel) else: duplicates.append(channel.get('Stream name', 'Unknown')) if duplicates: log_message(f"Removed {len(duplicates)} duplicates", "INFO") return unique_channels def update_existing_channels_with_country_detection(): """Re-detect countries for existing channels.""" if not os.path.exists(CHANNELS_FILE): return settings = load_settings() log_message("Re-detecting countries for existing channels...", "INFO") with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() channel_blocks = re.split(r'\n\s*\n+', content.strip()) updated_channels = [] changes = 0 for block in channel_blocks: if block.strip(): channel = parse_channel_block(block) if channel: old_group = channel.get('Group', 'Uncategorized') detected = detect_country_from_channel( channel.get('Stream name', ''), channel.get('EPG id', ''), channel.get('Logo', '') ) if detected != "Uncategorized": channel['Group'] = detected if old_group != detected: changes += 1 log_message(f"Updated: '{channel.get('Stream name')}' โ†’ {detected}", "INFO") updated_channels.append(channel) if changes > 0: backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" try: import shutil shutil.copy2(CHANNELS_FILE, backup_name) except: pass with open(CHANNELS_FILE, 'w', encoding='utf-8') as f: for i, channel in enumerate(updated_channels): if i > 0: f.write("\n\n") f.write(convert_to_channels_txt_block(channel)) log_message(f"Updated {changes} channels with country detection", "INFO") def process_import(): """Process bulk M3U import with comprehensive filtering.""" settings = load_settings() group_overrides = load_group_overrides() if not os.path.exists(IMPORT_FILE): log_message(f"No {IMPORT_FILE} found, skipping import", "INFO") return [] log_message(f"Processing {IMPORT_FILE} for comprehensive bulk import...", "INFO") stats = { 'total_lines': 0, 'extinf_lines': 0, 'parsed': 0, 'valid': 0, 'filtered_adult': 0, 'filtered_invalid': 0, 'duplicates': 0, 'already_existed': 0, 'final_imported': 0 } imported_channels = [] try: with open(IMPORT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() stats['total_lines'] = len(lines) log_message(f"Processing {len(lines)} lines...", "INFO") i = 0 while i < len(lines): line = lines[i].strip() if line.startswith('#EXTINF:'): stats['extinf_lines'] += 1 if i + 1 < len(lines): extinf_line = line url_line = lines[i+1].strip() if url_line and not url_line.startswith('#'): channel = parse_m3u_entry(extinf_line, url_line) stats['parsed'] += 1 is_valid, reason = validate_channel(channel, settings) if not is_valid: if "adult" in reason.lower(): stats['filtered_adult'] += 1 else: stats['filtered_invalid'] += 1 i += 2 continue channel = apply_auto_country_detection(channel, group_overrides, settings) imported_channels.append(channel) stats['valid'] += 1 i += 2 else: i += 1 else: i += 1 if imported_channels: original_count = len(imported_channels) imported_channels = remove_duplicates(imported_channels, settings) stats['duplicates'] = original_count - len(imported_channels) existing_channels = [] if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() blocks = re.split(r'\n\s*\n+', content.strip()) for block in blocks: if block.strip(): existing_channels.append(parse_channel_block(block)) existing_sigs = {get_channel_signature(ch) for ch in existing_channels} new_channels = [] for channel in imported_channels: if get_channel_signature(channel) not in existing_sigs: new_channels.append(channel) else: stats['already_existed'] += 1 imported_channels = new_channels stats['final_imported'] = len(imported_channels) if imported_channels: with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: for i, channel in enumerate(imported_channels): if i > 0 or os.path.getsize(CHANNELS_FILE) > 0: f.write("\n\n") f.write(convert_to_channels_txt_block(channel)) except Exception as e: log_message(f"Error processing import: {e}", "ERROR") log_message("=== COMPREHENSIVE IMPORT STATISTICS ===", "INFO") for key, value in stats.items(): log_message(f"{key.replace('_', ' ').title()}: {value}", "INFO") log_message("=== END STATISTICS ===", "INFO") if settings.get('auto_cleanup_import', True): try: os.remove(IMPORT_FILE) log_message(f"Cleaned up {IMPORT_FILE}", "INFO") except: pass return imported_channels def generate_playlist(): """Main enhanced playlist generation function.""" if os.path.exists(LOG_FILE): open(LOG_FILE, 'w').close() log_message("Starting comprehensive playlist generation...", "INFO") settings = load_settings() group_overrides = load_group_overrides() update_existing_channels_with_country_detection() imported_channels = process_import() log_message(f"Import returned {len(imported_channels)} channels", "INFO") if not os.path.exists(CHANNELS_FILE): log_message(f"Error: {CHANNELS_FILE} not found", "ERROR") return with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() channel_blocks = re.split(r'\n\s*\n+', content.strip()) parsed_channels = [] for block in channel_blocks: if block.strip(): channel = parse_channel_block(block) if channel: parsed_channels.append(channel) log_message(f"Parsed {len(parsed_channels)} channels", "INFO") parsed_channels = remove_duplicates(parsed_channels, settings) if settings.get('sort_channels', True): parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) m3u_lines = ["#EXTM3U"] valid_channels = 0 country_stats = {} for channel in parsed_channels: stream_name = channel.get('Stream name', '') group_name = channel.get('Group', 'Uncategorized') logo_url = channel.get('Logo', '') epg_id = channel.get('EPG id', '') stream_url = channel.get('Stream URL', '') if not stream_name or not stream_url: continue extinf_attrs = [ f'tvg-id="{epg_id}"', f'tvg-logo="{logo_url}"', f'group-title="{group_name}"', f'tvg-name="{stream_name}"' ] extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}" m3u_lines.append(extinf_line) m3u_lines.append(stream_url) valid_channels += 1 country_stats[group_name] = country_stats.get(group_name, 0) + 1 try: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: for line in m3u_lines: f.write(line + '\n') log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO") sorted_stats = dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True)) log_message(f"Channels by country: {sorted_stats}", "INFO") except Exception as e: log_message(f"Error writing playlist: {e}", "ERROR") log_message("Comprehensive playlist generation complete", "INFO") if __name__ == "__main__": generate_playlist()