Update scripts/generate_playlist.py
Some checks are pending
Generate M3U Playlist with Auto-Organization / build-and-organize (push) Waiting to run

This commit is contained in:
stoney420 2025-06-29 04:02:20 +02:00
parent 5a6d293cd6
commit fa3f75a62e

View file

@ -6,6 +6,7 @@ Then filters to keep only legitimate countries
""" """
import os import os
import re
import shutil import shutil
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
@ -246,8 +247,589 @@ def is_valid_country_group(group_name):
return group_name in valid_countries return group_name in valid_countries
def clean_malformed_channel_name(raw_name):
"""Extract clean channel name from malformed EXTINF data."""
if not raw_name or len(raw_name) < 2:
return "Unknown Channel"
# Handle completely malformed entries like:
# ".AB.ca",.AB.ca" tvg-logo="..." group-title="DaddyLive CA",CTV Canada [HD]"
if raw_name.startswith('".') and 'tvg-logo=' in raw_name:
# Extract the actual channel name after the last comma
parts = raw_name.split(',')
if len(parts) > 1:
clean_name = parts[-1].strip().strip('"').strip()
if clean_name:
return clean_name
# If it contains EXTINF data, extract the name
if 'group-title=' in raw_name and ',' in raw_name:
extinf_match = re.search(r'group-title="[^"]*",(.+)
def reorganize_channels(channels):
"""Enhanced reorganization with integrated cleanup + 3-point analysis."""
print("\n🔍 Step 2: Enhanced Country Detection with 3-Point Analysis")
print("📊 Analyzing: Channel Name + EPG ID + Logo URL")
print("-" * 60)
changes = 0
stats = {
'country_detected': 0,
'sent_to_uncategorized': 0,
'kept_existing_country': 0,
'streaming_filtered': 0
}
country_counts = {}
for channel in channels:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo = channel.get('Logo', '')
stream_url = channel.get('Stream URL', '')
# Detect country using enhanced 3-point analysis
detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url)
# Debug output for first few channels to see what's happening
if changes < 5:
print(f"🔍 Debug: '{stream_name}' | EPG: '{epg_id}' | Detected: {detected_country}")
# Decide final group
if is_valid_country_group(old_group) and detected_country != "Uncategorized":
# Keep existing valid country
final_group = old_group
stats['kept_existing_country'] += 1
elif detected_country != "Uncategorized":
# Use detected country
final_group = detected_country
stats['country_detected'] += 1
if old_group != detected_country:
print(f"🔍 Fixed: '{stream_name}' {old_group}{detected_country}")
changes += 1
else:
# Send to Uncategorized
final_group = "Uncategorized"
stats['sent_to_uncategorized'] += 1
if old_group != "Uncategorized":
# Check if it's a streaming service
if any(service in stream_name.lower() for service in ['samsung', 'pluto', 'plex', 'tubi']):
stats['streaming_filtered'] += 1
print(f"📱 Platform: '{stream_name}' → Uncategorized")
else:
print(f"❓ Undetected: '{stream_name}' → Uncategorized")
changes += 1
channel['Group'] = final_group
country_counts[final_group] = country_counts.get(final_group, 0) + 1
print(f"\n📊 PROCESSING RESULTS:")
print(f"✅ Changes made: {changes}")
print(f"🔍 Country detected: {stats['country_detected']}")
print(f"✅ Kept existing countries: {stats['kept_existing_country']}")
print(f"📱 Streaming services filtered: {stats['streaming_filtered']}")
print(f"❓ Sent to Uncategorized: {stats['sent_to_uncategorized']}")
print(f"\n🌍 FINAL GROUP DISTRIBUTION:")
sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1]))
for country, count in sorted_countries:
percentage = (count / len(channels) * 100) if len(channels) > 0 else 0
print(f" {country}: {count} channels ({percentage:.1f}%)")
return channels
def save_channels(channels):
"""Save channels to file."""
# Backup
if os.path.exists('channels.txt'):
backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
shutil.copy2('channels.txt', backup)
print(f"📋 Backup: {backup}")
try:
with open('channels.txt', 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
print(f"✅ Saved {len(channels)} channels")
return True
except Exception as e:
print(f"❌ Save error: {e}")
return False
def generate_m3u(channels):
"""Generate M3U playlist."""
try:
with open('playlist.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
for channel in channels:
name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{name}\n{url}\n')
print("✅ Generated playlist.m3u")
return True
except Exception as e:
print(f"❌ M3U error: {e}")
return False
def main():
"""Main function with integrated data cleanup and country detection."""
print("🎯 Enhanced IPTV Processing - Data Cleanup + Country Detection")
print("=" * 80)
print("🧹 Step 1: Fix malformed channel data")
print("🔍 Step 2: 3-point country analysis (Channel Name + EPG ID + Logo URL)")
print("🎯 Step 3: Filter streaming services to Uncategorized")
print("=" * 80)
channels = load_channels()
if not channels:
return False
# Enhanced reorganization with cleanup
channels = reorganize_channels(channels)
# Sort: Countries first (alphabetically), then Uncategorized last
channels.sort(key=lambda x: (
"zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''),
x.get('Stream name', '')
))
# Save and generate
if not save_channels(channels):
return False
if not generate_m3u(channels):
return False
# Clear import
try:
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
print("🧹 Cleared import file")
except:
pass
print("\n🎉 ENHANCED PROCESSING COMPLETE!")
print("✅ Malformed data cleaned and fixed")
print("✅ 3-point analysis applied to all channels")
print("✅ Countries detected from EPG ID, Logo URL, and Channel Names")
print("✅ Streaming services filtered to Uncategorized")
print("✅ Clean country-organized playlist generated")
# Final statistics
uncategorized_count = sum(1 for ch in channels if ch.get('Group') == 'Uncategorized')
success_rate = ((len(channels) - uncategorized_count) / len(channels) * 100) if len(channels) > 0 else 0
print(f"\n📊 FINAL STATISTICS:")
print(f" Total channels: {len(channels)}")
print(f" Properly categorized: {len(channels) - uncategorized_count} ({success_rate:.1f}%)")
print(f" In Uncategorized: {uncategorized_count} ({100 - success_rate:.1f}%)")
return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1), raw_name)
if extinf_match:
return extinf_match.group(1).strip().strip('"')
# If it has extra quotes and domains, clean them
if raw_name.startswith('.') and raw_name.count('"') > 2:
parts = raw_name.split(',')
for part in reversed(parts):
cleaned = part.strip().strip('"').strip()
if cleaned and not cleaned.startswith('.') and len(cleaned) > 2:
if not any(x in cleaned.lower() for x in ['http', 'tvg-', 'group-title', '.com', '.ca', '.us']):
return cleaned
# Basic cleaning
cleaned = raw_name.strip().strip('"').strip()
# Remove leading dots and domains
if cleaned.startswith('.'):
cleaned = re.sub(r'^\.[\w.]+["\']*,?\s*', '', cleaned)
# Remove trailing EXTINF attributes
cleaned = re.sub(r'\s+tvg-.*
def reorganize_channels(channels):
"""Enhanced reorganization with 3-point analysis."""
print("🔍 Enhanced Country Detection with 3-Point Analysis")
print("📊 Analyzing: Channel Name + EPG ID + Logo URL")
print("-" * 60)
changes = 0
stats = {
'country_detected': 0,
'sent_to_uncategorized': 0,
'kept_existing_country': 0
}
country_counts = {}
for channel in channels:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo = channel.get('Logo', '')
stream_url = channel.get('Stream URL', '')
# Detect country using enhanced 3-point analysis
detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url)
# Decide final group
if is_valid_country_group(old_group) and detected_country != "Uncategorized":
# Keep existing valid country
final_group = old_group
stats['kept_existing_country'] += 1
elif detected_country != "Uncategorized":
# Use detected country
final_group = detected_country
stats['country_detected'] += 1
if old_group != detected_country:
print(f"🔍 Fixed: '{stream_name}' {old_group}{detected_country}")
changes += 1
else:
# Send to Uncategorized
final_group = "Uncategorized"
stats['sent_to_uncategorized'] += 1
if old_group != "Uncategorized":
print(f"📱 Platform: '{stream_name}' → Uncategorized")
changes += 1
channel['Group'] = final_group
country_counts[final_group] = country_counts.get(final_group, 0) + 1
print(f"\n📊 PROCESSING RESULTS:")
print(f"✅ Changes made: {changes}")
print(f"🔍 Country detected: {stats['country_detected']}")
print(f"✅ Kept existing countries: {stats['kept_existing_country']}")
print(f"📱 Sent to Uncategorized: {stats['sent_to_uncategorized']}")
print(f"\n🌍 FINAL GROUP DISTRIBUTION:")
sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1]))
for country, count in sorted_countries:
print(f" {country}: {count} channels")
return channels
def save_channels(channels):
"""Save channels to file."""
# Backup
if os.path.exists('channels.txt'):
backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
shutil.copy2('channels.txt', backup)
print(f"📋 Backup: {backup}")
try:
with open('channels.txt', 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
print(f"✅ Saved {len(channels)} channels")
return True
except Exception as e:
print(f"❌ Save error: {e}")
return False
def generate_m3u(channels):
"""Generate M3U playlist."""
try:
with open('playlist.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
for channel in channels:
name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{name}\n{url}\n')
print("✅ Generated playlist.m3u")
return True
except Exception as e:
print(f"❌ M3U error: {e}")
return False
def main():
"""Main function."""
print("🎯 Enhanced IPTV Country Detection - 3-Point Analysis")
print("=" * 70)
print("🔍 Analyzes: Channel Name + EPG ID + Logo URL")
print("🎯 Filters: Only countries remain, streaming services → Uncategorized")
print("=" * 70)
channels = load_channels()
if not channels:
return False
# Enhanced reorganization
channels = reorganize_channels(channels)
# Sort: Countries first (alphabetically), then Uncategorized last
channels.sort(key=lambda x: (
"zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''),
x.get('Stream name', '')
))
# Save and generate
if not save_channels(channels):
return False
if not generate_m3u(channels):
return False
# Clear import
try:
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
print("🧹 Cleared import file")
except:
pass
print("\n🎉 ENHANCED PROCESSING COMPLETE!")
print("✅ 3-point analysis applied to all channels")
print("✅ Countries detected from EPG ID, Logo URL, and Channel Names")
print("✅ Streaming services filtered to Uncategorized")
print("✅ Clean country-organized playlist generated")
return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1), '', cleaned)
cleaned = re.sub(r'\s+group-title.*
def reorganize_channels(channels):
"""Enhanced reorganization with 3-point analysis."""
print("🔍 Enhanced Country Detection with 3-Point Analysis")
print("📊 Analyzing: Channel Name + EPG ID + Logo URL")
print("-" * 60)
changes = 0
stats = {
'country_detected': 0,
'sent_to_uncategorized': 0,
'kept_existing_country': 0
}
country_counts = {}
for channel in channels:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo = channel.get('Logo', '')
stream_url = channel.get('Stream URL', '')
# Detect country using enhanced 3-point analysis
detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url)
# Decide final group
if is_valid_country_group(old_group) and detected_country != "Uncategorized":
# Keep existing valid country
final_group = old_group
stats['kept_existing_country'] += 1
elif detected_country != "Uncategorized":
# Use detected country
final_group = detected_country
stats['country_detected'] += 1
if old_group != detected_country:
print(f"🔍 Fixed: '{stream_name}' {old_group}{detected_country}")
changes += 1
else:
# Send to Uncategorized
final_group = "Uncategorized"
stats['sent_to_uncategorized'] += 1
if old_group != "Uncategorized":
print(f"📱 Platform: '{stream_name}' → Uncategorized")
changes += 1
channel['Group'] = final_group
country_counts[final_group] = country_counts.get(final_group, 0) + 1
print(f"\n📊 PROCESSING RESULTS:")
print(f"✅ Changes made: {changes}")
print(f"🔍 Country detected: {stats['country_detected']}")
print(f"✅ Kept existing countries: {stats['kept_existing_country']}")
print(f"📱 Sent to Uncategorized: {stats['sent_to_uncategorized']}")
print(f"\n🌍 FINAL GROUP DISTRIBUTION:")
sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1]))
for country, count in sorted_countries:
print(f" {country}: {count} channels")
return channels
def save_channels(channels):
"""Save channels to file."""
# Backup
if os.path.exists('channels.txt'):
backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
shutil.copy2('channels.txt', backup)
print(f"📋 Backup: {backup}")
try:
with open('channels.txt', 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
print(f"✅ Saved {len(channels)} channels")
return True
except Exception as e:
print(f"❌ Save error: {e}")
return False
def generate_m3u(channels):
"""Generate M3U playlist."""
try:
with open('playlist.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
for channel in channels:
name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{name}\n{url}\n')
print("✅ Generated playlist.m3u")
return True
except Exception as e:
print(f"❌ M3U error: {e}")
return False
def main():
"""Main function."""
print("🎯 Enhanced IPTV Country Detection - 3-Point Analysis")
print("=" * 70)
print("🔍 Analyzes: Channel Name + EPG ID + Logo URL")
print("🎯 Filters: Only countries remain, streaming services → Uncategorized")
print("=" * 70)
channels = load_channels()
if not channels:
return False
# Enhanced reorganization
channels = reorganize_channels(channels)
# Sort: Countries first (alphabetically), then Uncategorized last
channels.sort(key=lambda x: (
"zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''),
x.get('Stream name', '')
))
# Save and generate
if not save_channels(channels):
return False
if not generate_m3u(channels):
return False
# Clear import
try:
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
print("🧹 Cleared import file")
except:
pass
print("\n🎉 ENHANCED PROCESSING COMPLETE!")
print("✅ 3-point analysis applied to all channels")
print("✅ Countries detected from EPG ID, Logo URL, and Channel Names")
print("✅ Streaming services filtered to Uncategorized")
print("✅ Clean country-organized playlist generated")
return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1), '', cleaned)
return cleaned if cleaned and len(cleaned) > 1 else "Unknown Channel"
def extract_epg_from_malformed(raw_name):
"""Extract EPG ID from malformed data."""
# Look for domain patterns like .AB.ca, .ON.ca, etc.
domain_match = re.search(r'\.([A-Z]{2})\.ca', raw_name)
if domain_match:
province = domain_match.group(1)
return f"generic.{province}.ca"
# Look for .us domains
domain_match = re.search(r'\.([A-Z]{2})\.us', raw_name)
if domain_match:
state = domain_match.group(1)
return f"generic.{state}.us"
return ""
def load_channels(): def load_channels():
"""Load channels from channels.txt with enhanced parsing for malformed entries.""" """Load channels from channels.txt with integrated data cleanup."""
if not os.path.exists('channels.txt'): if not os.path.exists('channels.txt'):
print("❌ No channels.txt found") print("❌ No channels.txt found")
return [] return []
@ -257,32 +839,54 @@ def load_channels():
content = f.read() content = f.read()
channels = [] channels = []
cleaned_count = 0
print("🧹 Step 1: Data Cleanup (fixing malformed entries)")
print("-" * 50)
for block in content.split('\n\n'): for block in content.split('\n\n'):
if not block.strip(): if not block.strip():
continue continue
channel_data = {} channel_data = {}
for line in block.strip().split('\n'): for line in block.strip().split('\n'):
if '=' in line: if '=' in line:
key, value = line.split('=', 1) key, value = line.split('=', 1)
key = key.strip() key = key.strip()
value = value.strip() value = value.strip()
# Clean up malformed values (fix the quote issues we saw) if key == "Stream name":
if key == "Stream name" and value.startswith('"') and value.count('"') > 2: # Check if this is malformed
# Handle malformed entries like: ".AB.ca",.AB.ca" tvg-logo=... if (value.startswith('".') or 'tvg-logo=' in value or
# Extract just the actual channel name 'group-title=' in value or value.count('"') > 2):
parts = value.split(',')
if len(parts) > 1:
value = parts[-1].strip().strip('"')
channel_data[key] = value # Clean the malformed name
clean_name = clean_malformed_channel_name(value)
channel_data["Stream name"] = clean_name
# Only add channels with valid stream names # Extract EPG ID if missing
if channel_data.get('Stream name') and len(channel_data.get('Stream name', '')) > 1: if not channel_data.get("EPG id"):
extracted_epg = extract_epg_from_malformed(value)
if extracted_epg:
channel_data["EPG id"] = extracted_epg
cleaned_count += 1
if cleaned_count <= 10: # Show first 10 examples
print(f"🔧 Fixed: '{value[:40]}...''{clean_name}'")
else:
channel_data[key] = value
else:
channel_data[key] = value
# Only add channels with valid names
if (channel_data.get('Stream name') and
len(channel_data.get('Stream name', '')) > 1 and
channel_data.get('Stream name') != "Unknown Channel"):
channels.append(channel_data) channels.append(channel_data)
print(f"✅ Loaded {len(channels)} channels (with enhanced parsing)") print(f"✅ Data cleanup complete: {cleaned_count} entries fixed")
print(f"📊 Loaded {len(channels)} channels (after cleanup)")
return channels return channels
except Exception as e: except Exception as e: