diff --git a/scripts/generate_playlist.py b/scripts/generate_playlist.py index 4ec99e8..0d6fdc4 100644 --- a/scripts/generate_playlist.py +++ b/scripts/generate_playlist.py @@ -6,6 +6,7 @@ Then filters to keep only legitimate countries """ import os +import re import shutil from datetime import datetime from pathlib import Path @@ -246,8 +247,589 @@ def is_valid_country_group(group_name): return group_name in valid_countries +def clean_malformed_channel_name(raw_name): + """Extract clean channel name from malformed EXTINF data.""" + + if not raw_name or len(raw_name) < 2: + return "Unknown Channel" + + # Handle completely malformed entries like: + # ".AB.ca",.AB.ca" tvg-logo="..." group-title="DaddyLive CA",CTV Canada [HD]" + + if raw_name.startswith('".') and 'tvg-logo=' in raw_name: + # Extract the actual channel name after the last comma + parts = raw_name.split(',') + if len(parts) > 1: + clean_name = parts[-1].strip().strip('"').strip() + if clean_name: + return clean_name + + # If it contains EXTINF data, extract the name + if 'group-title=' in raw_name and ',' in raw_name: + extinf_match = re.search(r'group-title="[^"]*",(.+) + + +def reorganize_channels(channels): + """Enhanced reorganization with integrated cleanup + 3-point analysis.""" + print("\n๐Ÿ” Step 2: Enhanced Country Detection with 3-Point Analysis") + print("๐Ÿ“Š Analyzing: Channel Name + EPG ID + Logo URL") + print("-" * 60) + + changes = 0 + stats = { + 'country_detected': 0, + 'sent_to_uncategorized': 0, + 'kept_existing_country': 0, + 'streaming_filtered': 0 + } + country_counts = {} + + for channel in channels: + old_group = channel.get('Group', 'Uncategorized') + stream_name = channel.get('Stream name', '') + epg_id = channel.get('EPG id', '') + logo = channel.get('Logo', '') + stream_url = channel.get('Stream URL', '') + + # Detect country using enhanced 3-point analysis + detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url) + + # Debug output for first few channels to see what's happening + if changes < 5: + print(f"๐Ÿ” Debug: '{stream_name}' | EPG: '{epg_id}' | Detected: {detected_country}") + + # Decide final group + if is_valid_country_group(old_group) and detected_country != "Uncategorized": + # Keep existing valid country + final_group = old_group + stats['kept_existing_country'] += 1 + elif detected_country != "Uncategorized": + # Use detected country + final_group = detected_country + stats['country_detected'] += 1 + if old_group != detected_country: + print(f"๐Ÿ” Fixed: '{stream_name}' {old_group} โ†’ {detected_country}") + changes += 1 + else: + # Send to Uncategorized + final_group = "Uncategorized" + stats['sent_to_uncategorized'] += 1 + if old_group != "Uncategorized": + # Check if it's a streaming service + if any(service in stream_name.lower() for service in ['samsung', 'pluto', 'plex', 'tubi']): + stats['streaming_filtered'] += 1 + print(f"๐Ÿ“ฑ Platform: '{stream_name}' โ†’ Uncategorized") + else: + print(f"โ“ Undetected: '{stream_name}' โ†’ Uncategorized") + changes += 1 + + channel['Group'] = final_group + country_counts[final_group] = country_counts.get(final_group, 0) + 1 + + print(f"\n๐Ÿ“Š PROCESSING RESULTS:") + print(f"โœ… Changes made: {changes}") + print(f"๐Ÿ” Country detected: {stats['country_detected']}") + print(f"โœ… Kept existing countries: {stats['kept_existing_country']}") + print(f"๐Ÿ“ฑ Streaming services filtered: {stats['streaming_filtered']}") + print(f"โ“ Sent to Uncategorized: {stats['sent_to_uncategorized']}") + + print(f"\n๐ŸŒ FINAL GROUP DISTRIBUTION:") + sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1])) + for country, count in sorted_countries: + percentage = (count / len(channels) * 100) if len(channels) > 0 else 0 + print(f" {country}: {count} channels ({percentage:.1f}%)") + + return channels + + +def save_channels(channels): + """Save channels to file.""" + # Backup + if os.path.exists('channels.txt'): + backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + shutil.copy2('channels.txt', backup) + print(f"๐Ÿ“‹ Backup: {backup}") + + try: + with open('channels.txt', 'w', encoding='utf-8') as f: + for i, channel in enumerate(channels): + if i > 0: + f.write("\n\n") + f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") + f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") + f.write(f"Logo = {channel.get('Logo', '')}\n") + f.write(f"EPG id = {channel.get('EPG id', '')}\n") + f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") + + print(f"โœ… Saved {len(channels)} channels") + return True + except Exception as e: + print(f"โŒ Save error: {e}") + return False + + +def generate_m3u(channels): + """Generate M3U playlist.""" + try: + with open('playlist.m3u', 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + + for channel in channels: + name = channel.get('Stream name', '') + group = channel.get('Group', 'Uncategorized') + logo = channel.get('Logo', '') + epg_id = channel.get('EPG id', '') + url = channel.get('Stream URL', '') + + if name and url: + f.write(f'#EXTINF:-1 group-title="{group}"') + if logo: + f.write(f' tvg-logo="{logo}"') + if epg_id: + f.write(f' tvg-id="{epg_id}"') + f.write(f',{name}\n{url}\n') + + print("โœ… Generated playlist.m3u") + return True + except Exception as e: + print(f"โŒ M3U error: {e}") + return False + + +def main(): + """Main function with integrated data cleanup and country detection.""" + print("๐ŸŽฏ Enhanced IPTV Processing - Data Cleanup + Country Detection") + print("=" * 80) + print("๐Ÿงน Step 1: Fix malformed channel data") + print("๐Ÿ” Step 2: 3-point country analysis (Channel Name + EPG ID + Logo URL)") + print("๐ŸŽฏ Step 3: Filter streaming services to Uncategorized") + print("=" * 80) + + channels = load_channels() + if not channels: + return False + + # Enhanced reorganization with cleanup + channels = reorganize_channels(channels) + + # Sort: Countries first (alphabetically), then Uncategorized last + channels.sort(key=lambda x: ( + "zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''), + x.get('Stream name', '') + )) + + # Save and generate + if not save_channels(channels): + return False + + if not generate_m3u(channels): + return False + + # Clear import + try: + with open('bulk_import.m3u', 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + print("๐Ÿงน Cleared import file") + except: + pass + + print("\n๐ŸŽ‰ ENHANCED PROCESSING COMPLETE!") + print("โœ… Malformed data cleaned and fixed") + print("โœ… 3-point analysis applied to all channels") + print("โœ… Countries detected from EPG ID, Logo URL, and Channel Names") + print("โœ… Streaming services filtered to Uncategorized") + print("โœ… Clean country-organized playlist generated") + + # Final statistics + uncategorized_count = sum(1 for ch in channels if ch.get('Group') == 'Uncategorized') + success_rate = ((len(channels) - uncategorized_count) / len(channels) * 100) if len(channels) > 0 else 0 + print(f"\n๐Ÿ“Š FINAL STATISTICS:") + print(f" Total channels: {len(channels)}") + print(f" Properly categorized: {len(channels) - uncategorized_count} ({success_rate:.1f}%)") + print(f" In Uncategorized: {uncategorized_count} ({100 - success_rate:.1f}%)") + + return True + + +if __name__ == "__main__": + success = main() + exit(0 if success else 1), raw_name) + if extinf_match: + return extinf_match.group(1).strip().strip('"') + + # If it has extra quotes and domains, clean them + if raw_name.startswith('.') and raw_name.count('"') > 2: + parts = raw_name.split(',') + for part in reversed(parts): + cleaned = part.strip().strip('"').strip() + if cleaned and not cleaned.startswith('.') and len(cleaned) > 2: + if not any(x in cleaned.lower() for x in ['http', 'tvg-', 'group-title', '.com', '.ca', '.us']): + return cleaned + + # Basic cleaning + cleaned = raw_name.strip().strip('"').strip() + + # Remove leading dots and domains + if cleaned.startswith('.'): + cleaned = re.sub(r'^\.[\w.]+["\']*,?\s*', '', cleaned) + + # Remove trailing EXTINF attributes + cleaned = re.sub(r'\s+tvg-.* + + +def reorganize_channels(channels): + """Enhanced reorganization with 3-point analysis.""" + print("๐Ÿ” Enhanced Country Detection with 3-Point Analysis") + print("๐Ÿ“Š Analyzing: Channel Name + EPG ID + Logo URL") + print("-" * 60) + + changes = 0 + stats = { + 'country_detected': 0, + 'sent_to_uncategorized': 0, + 'kept_existing_country': 0 + } + country_counts = {} + + for channel in channels: + old_group = channel.get('Group', 'Uncategorized') + stream_name = channel.get('Stream name', '') + epg_id = channel.get('EPG id', '') + logo = channel.get('Logo', '') + stream_url = channel.get('Stream URL', '') + + # Detect country using enhanced 3-point analysis + detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url) + + # Decide final group + if is_valid_country_group(old_group) and detected_country != "Uncategorized": + # Keep existing valid country + final_group = old_group + stats['kept_existing_country'] += 1 + elif detected_country != "Uncategorized": + # Use detected country + final_group = detected_country + stats['country_detected'] += 1 + if old_group != detected_country: + print(f"๐Ÿ” Fixed: '{stream_name}' {old_group} โ†’ {detected_country}") + changes += 1 + else: + # Send to Uncategorized + final_group = "Uncategorized" + stats['sent_to_uncategorized'] += 1 + if old_group != "Uncategorized": + print(f"๐Ÿ“ฑ Platform: '{stream_name}' โ†’ Uncategorized") + changes += 1 + + channel['Group'] = final_group + country_counts[final_group] = country_counts.get(final_group, 0) + 1 + + print(f"\n๐Ÿ“Š PROCESSING RESULTS:") + print(f"โœ… Changes made: {changes}") + print(f"๐Ÿ” Country detected: {stats['country_detected']}") + print(f"โœ… Kept existing countries: {stats['kept_existing_country']}") + print(f"๐Ÿ“ฑ Sent to Uncategorized: {stats['sent_to_uncategorized']}") + + print(f"\n๐ŸŒ FINAL GROUP DISTRIBUTION:") + sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1])) + for country, count in sorted_countries: + print(f" {country}: {count} channels") + + return channels + + +def save_channels(channels): + """Save channels to file.""" + # Backup + if os.path.exists('channels.txt'): + backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + shutil.copy2('channels.txt', backup) + print(f"๐Ÿ“‹ Backup: {backup}") + + try: + with open('channels.txt', 'w', encoding='utf-8') as f: + for i, channel in enumerate(channels): + if i > 0: + f.write("\n\n") + f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") + f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") + f.write(f"Logo = {channel.get('Logo', '')}\n") + f.write(f"EPG id = {channel.get('EPG id', '')}\n") + f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") + + print(f"โœ… Saved {len(channels)} channels") + return True + except Exception as e: + print(f"โŒ Save error: {e}") + return False + + +def generate_m3u(channels): + """Generate M3U playlist.""" + try: + with open('playlist.m3u', 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + + for channel in channels: + name = channel.get('Stream name', '') + group = channel.get('Group', 'Uncategorized') + logo = channel.get('Logo', '') + epg_id = channel.get('EPG id', '') + url = channel.get('Stream URL', '') + + if name and url: + f.write(f'#EXTINF:-1 group-title="{group}"') + if logo: + f.write(f' tvg-logo="{logo}"') + if epg_id: + f.write(f' tvg-id="{epg_id}"') + f.write(f',{name}\n{url}\n') + + print("โœ… Generated playlist.m3u") + return True + except Exception as e: + print(f"โŒ M3U error: {e}") + return False + + +def main(): + """Main function.""" + print("๐ŸŽฏ Enhanced IPTV Country Detection - 3-Point Analysis") + print("=" * 70) + print("๐Ÿ” Analyzes: Channel Name + EPG ID + Logo URL") + print("๐ŸŽฏ Filters: Only countries remain, streaming services โ†’ Uncategorized") + print("=" * 70) + + channels = load_channels() + if not channels: + return False + + # Enhanced reorganization + channels = reorganize_channels(channels) + + # Sort: Countries first (alphabetically), then Uncategorized last + channels.sort(key=lambda x: ( + "zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''), + x.get('Stream name', '') + )) + + # Save and generate + if not save_channels(channels): + return False + + if not generate_m3u(channels): + return False + + # Clear import + try: + with open('bulk_import.m3u', 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + print("๐Ÿงน Cleared import file") + except: + pass + + print("\n๐ŸŽ‰ ENHANCED PROCESSING COMPLETE!") + print("โœ… 3-point analysis applied to all channels") + print("โœ… Countries detected from EPG ID, Logo URL, and Channel Names") + print("โœ… Streaming services filtered to Uncategorized") + print("โœ… Clean country-organized playlist generated") + + return True + + +if __name__ == "__main__": + success = main() + exit(0 if success else 1), '', cleaned) + cleaned = re.sub(r'\s+group-title.* + + +def reorganize_channels(channels): + """Enhanced reorganization with 3-point analysis.""" + print("๐Ÿ” Enhanced Country Detection with 3-Point Analysis") + print("๐Ÿ“Š Analyzing: Channel Name + EPG ID + Logo URL") + print("-" * 60) + + changes = 0 + stats = { + 'country_detected': 0, + 'sent_to_uncategorized': 0, + 'kept_existing_country': 0 + } + country_counts = {} + + for channel in channels: + old_group = channel.get('Group', 'Uncategorized') + stream_name = channel.get('Stream name', '') + epg_id = channel.get('EPG id', '') + logo = channel.get('Logo', '') + stream_url = channel.get('Stream URL', '') + + # Detect country using enhanced 3-point analysis + detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url) + + # Decide final group + if is_valid_country_group(old_group) and detected_country != "Uncategorized": + # Keep existing valid country + final_group = old_group + stats['kept_existing_country'] += 1 + elif detected_country != "Uncategorized": + # Use detected country + final_group = detected_country + stats['country_detected'] += 1 + if old_group != detected_country: + print(f"๐Ÿ” Fixed: '{stream_name}' {old_group} โ†’ {detected_country}") + changes += 1 + else: + # Send to Uncategorized + final_group = "Uncategorized" + stats['sent_to_uncategorized'] += 1 + if old_group != "Uncategorized": + print(f"๐Ÿ“ฑ Platform: '{stream_name}' โ†’ Uncategorized") + changes += 1 + + channel['Group'] = final_group + country_counts[final_group] = country_counts.get(final_group, 0) + 1 + + print(f"\n๐Ÿ“Š PROCESSING RESULTS:") + print(f"โœ… Changes made: {changes}") + print(f"๐Ÿ” Country detected: {stats['country_detected']}") + print(f"โœ… Kept existing countries: {stats['kept_existing_country']}") + print(f"๐Ÿ“ฑ Sent to Uncategorized: {stats['sent_to_uncategorized']}") + + print(f"\n๐ŸŒ FINAL GROUP DISTRIBUTION:") + sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1])) + for country, count in sorted_countries: + print(f" {country}: {count} channels") + + return channels + + +def save_channels(channels): + """Save channels to file.""" + # Backup + if os.path.exists('channels.txt'): + backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt" + shutil.copy2('channels.txt', backup) + print(f"๐Ÿ“‹ Backup: {backup}") + + try: + with open('channels.txt', 'w', encoding='utf-8') as f: + for i, channel in enumerate(channels): + if i > 0: + f.write("\n\n") + f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n") + f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n") + f.write(f"Logo = {channel.get('Logo', '')}\n") + f.write(f"EPG id = {channel.get('EPG id', '')}\n") + f.write(f"Stream URL = {channel.get('Stream URL', '')}\n") + + print(f"โœ… Saved {len(channels)} channels") + return True + except Exception as e: + print(f"โŒ Save error: {e}") + return False + + +def generate_m3u(channels): + """Generate M3U playlist.""" + try: + with open('playlist.m3u', 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + + for channel in channels: + name = channel.get('Stream name', '') + group = channel.get('Group', 'Uncategorized') + logo = channel.get('Logo', '') + epg_id = channel.get('EPG id', '') + url = channel.get('Stream URL', '') + + if name and url: + f.write(f'#EXTINF:-1 group-title="{group}"') + if logo: + f.write(f' tvg-logo="{logo}"') + if epg_id: + f.write(f' tvg-id="{epg_id}"') + f.write(f',{name}\n{url}\n') + + print("โœ… Generated playlist.m3u") + return True + except Exception as e: + print(f"โŒ M3U error: {e}") + return False + + +def main(): + """Main function.""" + print("๐ŸŽฏ Enhanced IPTV Country Detection - 3-Point Analysis") + print("=" * 70) + print("๐Ÿ” Analyzes: Channel Name + EPG ID + Logo URL") + print("๐ŸŽฏ Filters: Only countries remain, streaming services โ†’ Uncategorized") + print("=" * 70) + + channels = load_channels() + if not channels: + return False + + # Enhanced reorganization + channels = reorganize_channels(channels) + + # Sort: Countries first (alphabetically), then Uncategorized last + channels.sort(key=lambda x: ( + "zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''), + x.get('Stream name', '') + )) + + # Save and generate + if not save_channels(channels): + return False + + if not generate_m3u(channels): + return False + + # Clear import + try: + with open('bulk_import.m3u', 'w', encoding='utf-8') as f: + f.write('#EXTM3U\n') + print("๐Ÿงน Cleared import file") + except: + pass + + print("\n๐ŸŽ‰ ENHANCED PROCESSING COMPLETE!") + print("โœ… 3-point analysis applied to all channels") + print("โœ… Countries detected from EPG ID, Logo URL, and Channel Names") + print("โœ… Streaming services filtered to Uncategorized") + print("โœ… Clean country-organized playlist generated") + + return True + + +if __name__ == "__main__": + success = main() + exit(0 if success else 1), '', cleaned) + + return cleaned if cleaned and len(cleaned) > 1 else "Unknown Channel" + + +def extract_epg_from_malformed(raw_name): + """Extract EPG ID from malformed data.""" + + # Look for domain patterns like .AB.ca, .ON.ca, etc. + domain_match = re.search(r'\.([A-Z]{2})\.ca', raw_name) + if domain_match: + province = domain_match.group(1) + return f"generic.{province}.ca" + + # Look for .us domains + domain_match = re.search(r'\.([A-Z]{2})\.us', raw_name) + if domain_match: + state = domain_match.group(1) + return f"generic.{state}.us" + + return "" + + def load_channels(): - """Load channels from channels.txt with enhanced parsing for malformed entries.""" + """Load channels from channels.txt with integrated data cleanup.""" if not os.path.exists('channels.txt'): print("โŒ No channels.txt found") return [] @@ -257,32 +839,54 @@ def load_channels(): content = f.read() channels = [] + cleaned_count = 0 + + print("๐Ÿงน Step 1: Data Cleanup (fixing malformed entries)") + print("-" * 50) + for block in content.split('\n\n'): if not block.strip(): continue channel_data = {} + for line in block.strip().split('\n'): if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() - # Clean up malformed values (fix the quote issues we saw) - if key == "Stream name" and value.startswith('"') and value.count('"') > 2: - # Handle malformed entries like: ".AB.ca",.AB.ca" tvg-logo=... - # Extract just the actual channel name - parts = value.split(',') - if len(parts) > 1: - value = parts[-1].strip().strip('"') - - channel_data[key] = value + if key == "Stream name": + # Check if this is malformed + if (value.startswith('".') or 'tvg-logo=' in value or + 'group-title=' in value or value.count('"') > 2): + + # Clean the malformed name + clean_name = clean_malformed_channel_name(value) + channel_data["Stream name"] = clean_name + + # Extract EPG ID if missing + if not channel_data.get("EPG id"): + extracted_epg = extract_epg_from_malformed(value) + if extracted_epg: + channel_data["EPG id"] = extracted_epg + + cleaned_count += 1 + if cleaned_count <= 10: # Show first 10 examples + print(f"๐Ÿ”ง Fixed: '{value[:40]}...' โ†’ '{clean_name}'") + else: + channel_data[key] = value + else: + channel_data[key] = value - # Only add channels with valid stream names - if channel_data.get('Stream name') and len(channel_data.get('Stream name', '')) > 1: + # Only add channels with valid names + if (channel_data.get('Stream name') and + len(channel_data.get('Stream name', '')) > 1 and + channel_data.get('Stream name') != "Unknown Channel"): channels.append(channel_data) - print(f"โœ… Loaded {len(channels)} channels (with enhanced parsing)") + print(f"โœ… Data cleanup complete: {cleaned_count} entries fixed") + print(f"๐Ÿ“Š Loaded {len(channels)} channels (after cleanup)") return channels except Exception as e: