import re import os import json from datetime import datetime # --- Simple Configuration --- CHANNELS_FILE = 'channels.txt' PLAYLIST_FILE = 'playlist.m3u' IMPORT_FILE = 'bulk_import.m3u' LOG_FILE = 'playlist_update.log' # Config files (optional) SETTINGS_FILE = 'config/settings.json' GROUP_OVERRIDES_FILE = 'config/group_overrides.json' def log_message(message, level="INFO"): """Logs messages to file and prints them.""" timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted_message = f"[{timestamp}] {level}: {message}" try: with open(LOG_FILE, 'a', encoding='utf-8') as f: f.write(formatted_message + "\n") except Exception as e: print(f"ERROR: Could not write to log: {e}") print(formatted_message) def load_settings(): """Load settings with defaults.""" default_settings = { "remove_duplicates": True, "sort_channels": True, "backup_before_import": True, "auto_cleanup_import": True, "auto_detect_country": True } if os.path.exists(SETTINGS_FILE): try: with open(SETTINGS_FILE, 'r', encoding='utf-8') as f: settings = json.load(f) return {**default_settings, **settings} except Exception as e: log_message(f"Could not load settings, using defaults: {e}", "WARNING") return default_settings def load_group_overrides(): """Load group overrides.""" if os.path.exists(GROUP_OVERRIDES_FILE): try: with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: log_message(f"Could not load group overrides: {e}", "WARNING") return {} def detect_country_from_channel(channel_name, epg_id="", logo_url=""): """ Auto-detect country from channel information using cascading approach. Returns the detected country group or 'Uncategorized' if no match found. """ # Normalize inputs for better matching channel_lower = channel_name.lower().strip() epg_lower = epg_id.lower().strip() logo_lower = logo_url.lower().strip() # 1. BROADCASTER NAMES (Most reliable) broadcaster_country = { # US Major Networks "cbs": "๐Ÿ‡บ๐Ÿ‡ธ United States", "nbc": "๐Ÿ‡บ๐Ÿ‡ธ United States", "abc": "๐Ÿ‡บ๐Ÿ‡ธ United States", "fox": "๐Ÿ‡บ๐Ÿ‡ธ United States", "espn": "๐Ÿ‡บ๐Ÿ‡ธ United States", "cnn": "๐Ÿ‡บ๐Ÿ‡ธ United States", "hbo": "๐Ÿ‡บ๐Ÿ‡ธ United States", "mtv": "๐Ÿ‡บ๐Ÿ‡ธ United States", "discovery": "๐Ÿ‡บ๐Ÿ‡ธ United States", "cartoon network": "๐Ÿ‡บ๐Ÿ‡ธ United States", "showtime": "๐Ÿ‡บ๐Ÿ‡ธ United States", "starz": "๐Ÿ‡บ๐Ÿ‡ธ United States", "tnt": "๐Ÿ‡บ๐Ÿ‡ธ United States", "tbs": "๐Ÿ‡บ๐Ÿ‡ธ United States", # UK Networks "bbc": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "itv": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "channel 4": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "sky": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "e4": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "film4": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "more4": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", "dave": "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom", # Canadian Networks "cbc": "๐Ÿ‡จ๐Ÿ‡ฆ Canada", "ctv": "๐Ÿ‡จ๐Ÿ‡ฆ Canada", "global": "๐Ÿ‡จ๐Ÿ‡ฆ Canada", "tvo": "๐Ÿ‡จ๐Ÿ‡ฆ Canada", "aptn": "๐Ÿ‡จ๐Ÿ‡ฆ Canada", # German Networks "ard": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "zdf": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "rtl": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "sat.1": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "pro7": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "vox": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "kabel": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", "n24": "๐Ÿ‡ฉ๐Ÿ‡ช Germany", # French Networks "tf1": "๐Ÿ‡ซ๐Ÿ‡ท France", "france 2": "๐Ÿ‡ซ๐Ÿ‡ท France", "m6": "๐Ÿ‡ซ๐Ÿ‡ท France", "canal+": "๐Ÿ‡ซ๐Ÿ‡ท France", "arte": "๐Ÿ‡ซ๐Ÿ‡ท France", # Spanish Networks "tve": "๐Ÿ‡ช๐Ÿ‡ธ Spain", "antena 3": "๐Ÿ‡ช๐Ÿ‡ธ Spain", "telecinco": "๐Ÿ‡ช๐Ÿ‡ธ Spain", "cuatro": "๐Ÿ‡ช๐Ÿ‡ธ Spain", "la sexta": "๐Ÿ‡ช๐Ÿ‡ธ Spain", # Italian Networks "rai": "๐Ÿ‡ฎ๐Ÿ‡น Italy", "mediaset": "๐Ÿ‡ฎ๐Ÿ‡น Italy", "canale 5": "๐Ÿ‡ฎ๐Ÿ‡น Italy", "italia 1": "๐Ÿ‡ฎ๐Ÿ‡น Italy", "rete 4": "๐Ÿ‡ฎ๐Ÿ‡น Italy", # Other Countries "globo": "๐Ÿ‡ง๐Ÿ‡ท Brazil", "band": "๐Ÿ‡ง๐Ÿ‡ท Brazil", "sbt": "๐Ÿ‡ง๐Ÿ‡ท Brazil", "televisa": "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico", "tv azteca": "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico", "al jazeera": "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic", "mbc": "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic", "lbc": "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic", "rt": "๐Ÿ‡ท๐Ÿ‡บ Russia", "channel one": "๐Ÿ‡ท๐Ÿ‡บ Russia", "cctv": "๐Ÿ‡จ๐Ÿ‡ณ China", "phoenix": "๐Ÿ‡จ๐Ÿ‡ณ China", "nhk": "๐Ÿ‡ฏ๐Ÿ‡ต Japan", "fuji": "๐Ÿ‡ฏ๐Ÿ‡ต Japan", "kbs": "๐Ÿ‡ฐ๐Ÿ‡ท South Korea", "sbs": "๐Ÿ‡ฐ๐Ÿ‡ท South Korea", "mbc": "๐Ÿ‡ฐ๐Ÿ‡ท South Korea", "abc au": "๐Ÿ‡ฆ๐Ÿ‡บ Australia", "seven": "๐Ÿ‡ฆ๐Ÿ‡บ Australia", "nine": "๐Ÿ‡ฆ๐Ÿ‡บ Australia", "npo": "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands", "rtl nl": "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands" } # Check for exact broadcaster matches for broadcaster, country in broadcaster_country.items(): if broadcaster in channel_lower: return country # 2. COUNTRY NAME PATTERNS (Very reliable) country_patterns = { "๐Ÿ‡บ๐Ÿ‡ธ United States": [ "usa", "united states", "america", "american", " us ", "us:", "(us)", "us hd" ], "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom": [ " uk ", "uk:", "(uk)", "united kingdom", "britain", "british", "england", "english" ], "๐Ÿ‡จ๐Ÿ‡ฆ Canada": [ "canada", "canadian", " ca ", "ca:", "(ca)" ], "๐Ÿ‡ฉ๐Ÿ‡ช Germany": [ "germany", "german", "deutschland", "deutsch", " de ", "de:", "(de)" ], "๐Ÿ‡ซ๐Ÿ‡ท France": [ "france", "french", "franรงais", " fr ", "fr:", "(fr)" ], "๐Ÿ‡ช๐Ÿ‡ธ Spain": [ "spain", "spanish", "espaรฑa", "espaรฑol", " es ", "es:", "(es)" ], "๐Ÿ‡ฎ๐Ÿ‡น Italy": [ "italy", "italian", "italia", "italiano", " it ", "it:", "(it)" ], "๐Ÿ‡ง๐Ÿ‡ท Brazil": [ "brazil", "brazilian", "brasil", "portuguรชs", " br ", "br:", "(br)" ], "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico": [ "mexico", "mexican", "mรฉxico", " mx ", "mx:", "(mx)" ], "๐Ÿ‡ฆ๐Ÿ‡บ Australia": [ "australia", "australian", "aussie", " au ", "au:", "(au)" ], "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands": [ "netherlands", "dutch", "holland", "nederland", " nl ", "nl:", "(nl)" ], "๐Ÿ‡ท๐Ÿ‡บ Russia": [ "russia", "russian", "ั€ะพััะธั", " ru ", "ru:", "(ru)" ], "๐Ÿ‡จ๐Ÿ‡ณ China": [ "china", "chinese", "ไธญๅ›ฝ", " cn ", "cn:", "(cn)" ], "๐Ÿ‡ฏ๐Ÿ‡ต Japan": [ "japan", "japanese", "ๆ—ฅๆœฌ", " jp ", "jp:", "(jp)" ], "๐Ÿ‡ฐ๐Ÿ‡ท South Korea": [ "korea", "korean", "ํ•œ๊ตญ", " kr ", "kr:", "(kr)", "south korea" ], "๐Ÿ‡ธ๐Ÿ‡ฆ Arabic": [ "arabic", "arab", "middle east", "ุงู„ุนุฑุจูŠุฉ", "al ", "aljazeera" ], "๐Ÿ‡ฎ๐Ÿ‡ณ India": [ "india", "indian", "hindi", "bollywood", "zee", "star plus" ], "๐Ÿ‡ต๐Ÿ‡น Portugal": [ "portugal", "portuguese", " pt ", "pt:", "(pt)" ], "๐Ÿ‡น๐Ÿ‡ท Turkey": [ "turkey", "turkish", "tรผrkiye", " tr ", "tr:", "(tr)" ] } # Check channel name for country patterns for country, patterns in country_patterns.items(): for pattern in patterns: if pattern in channel_lower: return country # 3. EPG ID ANALYSIS (Good for country codes) epg_country_map = { "๐Ÿ‡บ๐Ÿ‡ธ United States": [".us", "usa.", ".com"], "๐Ÿ‡ฌ๐Ÿ‡ง United Kingdom": [".uk", ".gb", "british"], "๐Ÿ‡จ๐Ÿ‡ฆ Canada": [".ca", "canada."], "๐Ÿ‡ฉ๐Ÿ‡ช Germany": [".de", "german."], "๐Ÿ‡ซ๐Ÿ‡ท France": [".fr", "france."], "๐Ÿ‡ช๐Ÿ‡ธ Spain": [".es", "spain."], "๐Ÿ‡ฎ๐Ÿ‡น Italy": [".it", "italy."], "๐Ÿ‡ง๐Ÿ‡ท Brazil": [".br", "brazil."], "๐Ÿ‡ฒ๐Ÿ‡ฝ Mexico": [".mx", "mexico."], "๐Ÿ‡ฆ๐Ÿ‡บ Australia": [".au", "australia."], "๐Ÿ‡ณ๐Ÿ‡ฑ Netherlands": [".nl", "netherlands."], "๐Ÿ‡ท๐Ÿ‡บ Russia": [".ru", "russia."], "๐Ÿ‡จ๐Ÿ‡ณ China": [".cn", "china."], "๐Ÿ‡ฏ๐Ÿ‡ต Japan": [".jp", "japan."], "๐Ÿ‡ฐ๐Ÿ‡ท South Korea": [".kr", "korea."], "๐Ÿ‡ฎ๐Ÿ‡ณ India": [".in", "india."], "๐Ÿ‡ต๐Ÿ‡น Portugal": [".pt", "portugal."], "๐Ÿ‡น๐Ÿ‡ท Turkey": [".tr", "turkey."] } # Check EPG ID for country indicators if epg_id: for country, patterns in epg_country_map.items(): for pattern in patterns: if pattern in epg_lower: return country # 4. LOGO URL ANALYSIS (Sometimes helpful) if logo_url: for country, patterns in epg_country_map.items(): for pattern in patterns: if pattern in logo_lower: return country # If no match found, return Uncategorized return "Uncategorized" def apply_auto_country_detection(channel, group_overrides, settings): """ Enhanced version of apply_group_overrides that includes auto-detection. """ stream_name = channel.get('Stream name', '') epg_id = channel.get('EPG id', '') logo_url = channel.get('Logo', '') current_group = channel.get('Group', 'Uncategorized') # First try manual overrides (highest priority) stream_name_lower = stream_name.lower() for key, new_group in group_overrides.items(): if key.lower() in stream_name_lower: channel['Group'] = new_group log_message(f"Manual override: '{stream_name}' โ†’ {new_group}", "DEBUG") return channel # If auto-detection is enabled, try it if settings.get('auto_detect_country', True): detected_country = detect_country_from_channel(stream_name, epg_id, logo_url) # Only override if we detected something specific (not "Uncategorized") if detected_country != "Uncategorized": channel['Group'] = detected_country log_message(f"Auto-detected: '{stream_name}' โ†’ {detected_country}", "INFO") else: # Keep existing group or set to Uncategorized if current_group in ['', 'Unknown', 'Other']: channel['Group'] = "Uncategorized" else: # Auto-detection disabled, use manual overrides only if current_group in ['', 'Unknown', 'Other']: channel['Group'] = "Uncategorized" return channel def parse_channel_block(block): """Parse a channel block from channels.txt.""" channel_data = {} lines = block.strip().split('\n') for line in lines: if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() channel_data[key] = value return channel_data def parse_m3u_entry(extinf_line, url_line): """Parse M3U entry.""" channel = {} # Extract attributes tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) tvg_name_match = re.search(r'tvg-name="([^"]*)"', extinf_line) channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else '' channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else '' channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else '' # Stream name after the last comma stream_name_match = re.search(r',(.+)$', extinf_line) channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel' channel['Stream URL'] = url_line.strip() return channel def convert_to_channels_txt_block(channel_data): """Convert to channels.txt format.""" block = [] block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") block.append(f"Logo = {channel_data.get('Logo', '')}") block.append(f"EPG id = {channel_data.get('EPG id', '')}") block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") return "\n".join(block) def get_channel_signature(channel): """Create unique signature for duplicate detection.""" stream_name = channel.get('Stream name', '').strip().lower() stream_url = channel.get('Stream URL', '').strip().lower() # Clean name stream_name_clean = re.sub(r'\s+', ' ', stream_name) stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) return f"{stream_name_clean}|{stream_url}" def remove_duplicates(channels, settings): """Remove duplicate channels.""" if not settings.get('remove_duplicates', True): log_message("Duplicate removal disabled", "INFO") return channels seen_signatures = set() unique_channels = [] duplicate_count = 0 for channel in channels: signature = get_channel_signature(channel) if signature not in seen_signatures: seen_signatures.add(signature) unique_channels.append(channel) else: duplicate_count += 1 if duplicate_count > 0: log_message(f"Removed {duplicate_count} duplicate channels", "INFO") else: log_message("No duplicates found", "INFO") return unique_channels def process_import(): """Process bulk import file.""" settings = load_settings() group_overrides = load_group_overrides() if not os.path.exists(IMPORT_FILE): log_message(f"No {IMPORT_FILE} found, skipping import", "INFO") return [] log_message(f"Processing {IMPORT_FILE}...", "INFO") imported_channels = [] try: with open(IMPORT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() log_message(f"Found {len(lines)} lines in import file", "INFO") i = 0 while i < len(lines): line = lines[i].strip() if line.startswith('#EXTINF:'): if i + 1 < len(lines): extinf_line = line url_line = lines[i+1].strip() if not url_line or url_line.startswith('#'): i += 1 continue channel_data = parse_m3u_entry(extinf_line, url_line) channel_data = apply_auto_country_detection(channel_data, group_overrides, settings) if channel_data.get('Stream name') and channel_data.get('Stream URL'): imported_channels.append(channel_data) i += 2 else: i += 1 else: i += 1 log_message(f"Parsed {len(imported_channels)} channels from import", "INFO") # Remove duplicates from import if imported_channels: imported_channels = remove_duplicates(imported_channels, settings) # Check existing channels existing_channels = [] if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() channel_blocks = re.split(r'\n\s*\n+', content.strip()) for block in channel_blocks: if block.strip(): existing_channels.append(parse_channel_block(block)) existing_signatures = {get_channel_signature(ch) for ch in existing_channels} new_channels = [] for channel in imported_channels: if get_channel_signature(channel) not in existing_signatures: new_channels.append(channel) imported_channels = new_channels log_message(f"Final import: {len(imported_channels)} new channels", "INFO") # Write to channels.txt if imported_channels: lines_before = 0 if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: lines_before = len(f.readlines()) with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: for i, channel in enumerate(imported_channels): if i > 0 or lines_before > 0: f.write("\n\n") block_content = convert_to_channels_txt_block(channel) f.write(block_content) log_message(f"Successfully imported {len(imported_channels)} channels", "INFO") else: log_message("No new channels to import", "INFO") except Exception as e: log_message(f"Error processing import: {e}", "ERROR") return imported_channels # Clean up import file if settings.get('auto_cleanup_import', True): try: os.remove(IMPORT_FILE) log_message(f"Cleaned up {IMPORT_FILE}", "INFO") except Exception as e: log_message(f"Could not remove {IMPORT_FILE}: {e}", "WARNING") return imported_channels def generate_playlist(): """Main function.""" # Clear log if os.path.exists(LOG_FILE): open(LOG_FILE, 'w').close() log_message("Starting playlist generation...", "INFO") settings = load_settings() group_overrides = load_group_overrides() # Process import imported_channels = process_import() log_message(f"Import returned {len(imported_channels)} channels", "INFO") # Read channels.txt if not os.path.exists(CHANNELS_FILE): log_message(f"Error: {CHANNELS_FILE} not found", "ERROR") return with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() # Parse channels channel_blocks = re.split(r'\n\s*\n+', content.strip()) parsed_channels = [] for block in channel_blocks: if block.strip(): channel = parse_channel_block(block) if channel: channel = apply_auto_country_detection(channel, group_overrides, settings) parsed_channels.append(channel) log_message(f"Parsed {len(parsed_channels)} channels", "INFO") # Remove duplicates parsed_channels = remove_duplicates(parsed_channels, settings) # Sort channels if settings.get('sort_channels', True): parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) log_message("Channels sorted by country and name", "INFO") # Build M3U m3u_lines = ["#EXTM3U"] valid_channels = 0 # Count channels by country for stats country_stats = {} for channel in parsed_channels: stream_name = channel.get('Stream name', '') group_name = channel.get('Group', 'Uncategorized') logo_url = channel.get('Logo', '') epg_id = channel.get('EPG id', '') stream_url = channel.get('Stream URL', '') if not stream_name or not stream_url: continue extinf_attrs = [ f'tvg-id="{epg_id}"', f'tvg-logo="{logo_url}"', f'group-title="{group_name}"', f'tvg-name="{stream_name}"' ] extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}" m3u_lines.append(extinf_line) m3u_lines.append(stream_url) valid_channels += 1 # Count by country country_stats[group_name] = country_stats.get(group_name, 0) + 1 # Write M3U try: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: for line in m3u_lines: f.write(line + '\n') log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO") # Log country statistics log_message(f"Channels by country: {dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True))}", "INFO") except Exception as e: log_message(f"Error writing playlist: {e}", "ERROR") log_message("Playlist generation complete", "INFO") if __name__ == "__main__": generate_playlist()