Update scripts/generate_playlist.py
Some checks failed
Generate M3U Playlist with Auto-Organization / build-and-organize (push) Has been cancelled

This commit is contained in:
stoney420 2025-06-29 04:19:39 +02:00
parent 5021141f85
commit 241856217c

View file

@ -1,6 +1,6 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
IPTV Enhanced Country Detection - Updated Version IPTV Enhanced Country Detection - Complete Working Version
Uses 3-point analysis: Channel Name + EPG ID + Logo URL Uses 3-point analysis: Channel Name + EPG ID + Logo URL
Then filters to keep only legitimate countries Then filters to keep only legitimate countries
""" """
@ -16,6 +16,70 @@ script_dir = Path(__file__).parent
root_dir = script_dir.parent root_dir = script_dir.parent
os.chdir(root_dir) os.chdir(root_dir)
def clean_malformed_channel_name(raw_name):
"""Extract clean channel name from malformed EXTINF data."""
if not raw_name or len(raw_name) < 2:
return "Unknown Channel"
# Handle completely malformed entries like:
# ".AB.ca",.AB.ca" tvg-logo="..." group-title="DaddyLive CA",CTV Canada [HD]"
if raw_name.startswith('".') and 'tvg-logo=' in raw_name:
# Extract the actual channel name after the last comma
parts = raw_name.split(',')
if len(parts) > 1:
clean_name = parts[-1].strip().strip('"').strip()
if clean_name:
return clean_name
# If it contains EXTINF data, extract the name
if 'group-title=' in raw_name and ',' in raw_name:
extinf_match = re.search(r'group-title="[^"]*",(.+)$', raw_name)
if extinf_match:
return extinf_match.group(1).strip().strip('"')
# If it has extra quotes and domains, clean them
if raw_name.startswith('.') and raw_name.count('"') > 2:
parts = raw_name.split(',')
for part in reversed(parts):
cleaned = part.strip().strip('"').strip()
if cleaned and not cleaned.startswith('.') and len(cleaned) > 2:
if not any(x in cleaned.lower() for x in ['http', 'tvg-', 'group-title', '.com', '.ca', '.us']):
return cleaned
# Basic cleaning
cleaned = raw_name.strip().strip('"').strip()
# Remove leading dots and domains
if cleaned.startswith('.'):
cleaned = re.sub(r'^\.[\w.]+["\']*,?\s*', '', cleaned)
# Remove trailing EXTINF attributes
cleaned = re.sub(r'\s+tvg-.*$', '', cleaned)
cleaned = re.sub(r'\s+group-title.*$', '', cleaned)
return cleaned if cleaned and len(cleaned) > 1 else "Unknown Channel"
def extract_epg_from_malformed(raw_name):
"""Extract EPG ID from malformed data."""
# Look for domain patterns like .AB.ca, .ON.ca, etc.
domain_match = re.search(r'\.([A-Z]{2})\.ca', raw_name)
if domain_match:
province = domain_match.group(1)
return f"generic.{province}.ca"
# Look for .us domains
domain_match = re.search(r'\.([A-Z]{2})\.us', raw_name)
if domain_match:
state = domain_match.group(1)
return f"generic.{state}.us"
return ""
def detect_country_from_channel_content(channel_name, epg_id="", logo_url="", stream_url=""): def detect_country_from_channel_content(channel_name, epg_id="", logo_url="", stream_url=""):
""" """
Enhanced country detection using 3-point analysis Enhanced country detection using 3-point analysis
@ -159,21 +223,56 @@ def detect_country_from_channel_content(channel_name, epg_id="", logo_url="", st
if pattern in logo_url.lower(): if pattern in logo_url.lower():
return country return country
# STEP 5: Enhanced broadcaster patterns # STEP 5: Enhanced broadcaster patterns with more comprehensive coverage
broadcaster_patterns = { broadcaster_patterns = {
"🇨🇦 Canada": [ "🇨🇦 Canada": [
"cbc", "tsn", "ctv", "global", "sportsnet", "citytv", "aptn", "teletoon", "ytv", # Major networks
"discovery canada", "history canada", "slice", "w network", "oln", "hgtv canada", "cbc", "ctv", "global", "citytv", "aptn", "omni", "tvo",
"food network canada", "showcase", "crave", "super channel", "hollywood suite" # Sports
"tsn", "sportsnet", "rds", "rds info",
# Specialty
"teletoon", "ytv", "treehouse", "family channel", "oln", "cottage life",
"discovery canada", "history canada", "slice", "w network", "hgtv canada",
"food network canada", "showcase", "crave", "super channel", "hollywood suite",
# French Canadian
"ici", "télé-québec", "tva", "noovo", "canal d", "canal vie",
# Regional identifiers
"calgary", "vancouver", "toronto", "winnipeg", "montreal", "ottawa", "halifax",
"edmonton", "saskatoon", "regina", "victoria", "quebec city"
], ],
"🇺🇸 United States": [ "🇺🇸 United States": [
"cbs", "nbc", "abc", "fox", "cnn", "espn", "amc", "mtv", "comedy central", # Major networks
"discovery usa", "history usa", "tlc usa", "hgtv usa", "food network usa", "paramount", "cbs", "nbc", "abc", "fox", "cw", "pbs", "ion", "mynetworktv",
"nickelodeon usa", "cartoon network usa", "disney usa", "lifetime", "e!", "bravo usa" # News
"cnn", "fox news", "msnbc", "cnbc", "bloomberg", "newsmax", "oann",
# Sports
"espn", "fox sports", "nfl network", "mlb network", "nba tv", "nhl network",
# Premium
"hbo", "showtime", "starz", "cinemax", "epix",
# Cable networks
"mtv", "vh1", "comedy central", "cartoon network", "nickelodeon", "disney channel",
"discovery", "history", "tlc", "hgtv", "food network", "travel channel",
"lifetime", "hallmark", "e!", "bravo", "oxygen", "syfy", "usa network",
"tnt", "tbs", "fx", "fxx", "amc", "ifc", "tcm", "turner classic",
# But exclude specifically Canadian versions
"usa", "america", "american", "united states"
], ],
"🇬🇧 United Kingdom": [ "🇬🇧 United Kingdom": [
"bbc", "itv", "channel 4", "channel 5", "sky", "dave", "really", "yesterday", # BBC (but not BBC America)
"discovery uk", "history uk", "tlc uk", "living", "alibi", "gold", "drama" "bbc one", "bbc two", "bbc three", "bbc four", "bbc news", "bbc iplayer",
"bbc scotland", "bbc wales", "bbc northern ireland", "bbc parliament",
"bbc comedy", "bbc drama", "bbc earth", "bbc world news",
# ITV
"itv", "itv2", "itv3", "itv4", "itv be", "itvx",
# Channel 4
"channel 4", "channel 5", "e4", "more4", "film4",
# Sky
"sky", "sky news", "sky sports", "sky one", "sky two", "sky atlantic",
# Other UK
"dave", "really", "yesterday", "drama", "alibi", "gold", "living",
"discovery uk", "history uk", "tlc uk", "quest", "dmax uk",
# UK specific terms
"british", "england", "scotland", "wales", "northern ireland", "uk"
], ],
"🇩🇪 Germany": [ "🇩🇪 Germany": [
"ard", "zdf", "rtl", "pro7", "sat.1", "vox", "kabel eins", "super rtl", "rtl2", "ard", "zdf", "rtl", "pro7", "sat.1", "vox", "kabel eins", "super rtl", "rtl2",
@ -222,9 +321,15 @@ def detect_country_from_channel_content(channel_name, epg_id="", logo_url="", st
] ]
} }
# Enhanced pattern matching with conflict resolution
for country, keywords in broadcaster_patterns.items(): for country, keywords in broadcaster_patterns.items():
for keyword in keywords: for keyword in keywords:
if keyword in all_text: if keyword in all_text:
# Special handling for conflicting patterns
if country == "🇺🇸 United States" and any(ca_term in all_text for ca_term in [".ca", "canada", "canadian"]):
continue # Skip US assignment if Canadian indicators present
if country == "🇬🇧 United Kingdom" and "america" in all_text:
continue # Skip UK assignment if "america" is present
return country return country
return "Uncategorized" return "Uncategorized"
@ -247,26 +352,70 @@ def is_valid_country_group(group_name):
return group_name in valid_countries return group_name in valid_countries
def clean_malformed_channel_name(raw_name): def load_channels():
"""Extract clean channel name from malformed EXTINF data.""" """Load channels from channels.txt with integrated data cleanup."""
if not os.path.exists('channels.txt'):
print("❌ No channels.txt found")
return []
if not raw_name or len(raw_name) < 2: try:
return "Unknown Channel" with open('channels.txt', 'r', encoding='utf-8') as f:
content = f.read()
# Handle completely malformed entries like:
# ".AB.ca",.AB.ca" tvg-logo="..." group-title="DaddyLive CA",CTV Canada [HD]" channels = []
cleaned_count = 0
if raw_name.startswith('".') and 'tvg-logo=' in raw_name:
# Extract the actual channel name after the last comma print("🧹 Step 1: Data Cleanup (fixing malformed entries)")
parts = raw_name.split(',') print("-" * 50)
if len(parts) > 1:
clean_name = parts[-1].strip().strip('"').strip() for block in content.split('\n\n'):
if clean_name: if not block.strip():
return clean_name continue
# If it contains EXTINF data, extract the name channel_data = {}
if 'group-title=' in raw_name and ',' in raw_name:
extinf_match = re.search(r'group-title="[^"]*",(.+) for line in block.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
if key == "Stream name":
# Check if this is malformed
if (value.startswith('".') or 'tvg-logo=' in value or
'group-title=' in value or value.count('"') > 2):
# Clean the malformed name
clean_name = clean_malformed_channel_name(value)
channel_data["Stream name"] = clean_name
# Extract EPG ID if missing
if not channel_data.get("EPG id"):
extracted_epg = extract_epg_from_malformed(value)
if extracted_epg:
channel_data["EPG id"] = extracted_epg
cleaned_count += 1
if cleaned_count <= 10: # Show first 10 examples
print(f"🔧 Fixed: '{value[:40]}...''{clean_name}'")
else:
channel_data[key] = value
else:
channel_data[key] = value
# Only add channels with valid names
if (channel_data.get('Stream name') and
len(channel_data.get('Stream name', '')) > 1 and
channel_data.get('Stream name') != "Unknown Channel"):
channels.append(channel_data)
print(f"✅ Data cleanup complete: {cleaned_count} entries fixed")
print(f"📊 Loaded {len(channels)} channels (after cleanup)")
return channels
except Exception as e:
print(f"❌ Error loading channels: {e}")
return []
def reorganize_channels(channels): def reorganize_channels(channels):
@ -451,609 +600,6 @@ def main():
return True return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1), raw_name)
if extinf_match:
return extinf_match.group(1).strip().strip('"')
# If it has extra quotes and domains, clean them
if raw_name.startswith('.') and raw_name.count('"') > 2:
parts = raw_name.split(',')
for part in reversed(parts):
cleaned = part.strip().strip('"').strip()
if cleaned and not cleaned.startswith('.') and len(cleaned) > 2:
if not any(x in cleaned.lower() for x in ['http', 'tvg-', 'group-title', '.com', '.ca', '.us']):
return cleaned
# Basic cleaning
cleaned = raw_name.strip().strip('"').strip()
# Remove leading dots and domains
if cleaned.startswith('.'):
cleaned = re.sub(r'^\.[\w.]+["\']*,?\s*', '', cleaned)
# Remove trailing EXTINF attributes
cleaned = re.sub(r'\s+tvg-.*
def reorganize_channels(channels):
"""Enhanced reorganization with 3-point analysis."""
print("🔍 Enhanced Country Detection with 3-Point Analysis")
print("📊 Analyzing: Channel Name + EPG ID + Logo URL")
print("-" * 60)
changes = 0
stats = {
'country_detected': 0,
'sent_to_uncategorized': 0,
'kept_existing_country': 0
}
country_counts = {}
for channel in channels:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo = channel.get('Logo', '')
stream_url = channel.get('Stream URL', '')
# Detect country using enhanced 3-point analysis
detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url)
# Decide final group
if is_valid_country_group(old_group) and detected_country != "Uncategorized":
# Keep existing valid country
final_group = old_group
stats['kept_existing_country'] += 1
elif detected_country != "Uncategorized":
# Use detected country
final_group = detected_country
stats['country_detected'] += 1
if old_group != detected_country:
print(f"🔍 Fixed: '{stream_name}' {old_group}{detected_country}")
changes += 1
else:
# Send to Uncategorized
final_group = "Uncategorized"
stats['sent_to_uncategorized'] += 1
if old_group != "Uncategorized":
print(f"📱 Platform: '{stream_name}' → Uncategorized")
changes += 1
channel['Group'] = final_group
country_counts[final_group] = country_counts.get(final_group, 0) + 1
print(f"\n📊 PROCESSING RESULTS:")
print(f"✅ Changes made: {changes}")
print(f"🔍 Country detected: {stats['country_detected']}")
print(f"✅ Kept existing countries: {stats['kept_existing_country']}")
print(f"📱 Sent to Uncategorized: {stats['sent_to_uncategorized']}")
print(f"\n🌍 FINAL GROUP DISTRIBUTION:")
sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1]))
for country, count in sorted_countries:
print(f" {country}: {count} channels")
return channels
def save_channels(channels):
"""Save channels to file."""
# Backup
if os.path.exists('channels.txt'):
backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
shutil.copy2('channels.txt', backup)
print(f"📋 Backup: {backup}")
try:
with open('channels.txt', 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
print(f"✅ Saved {len(channels)} channels")
return True
except Exception as e:
print(f"❌ Save error: {e}")
return False
def generate_m3u(channels):
"""Generate M3U playlist."""
try:
with open('playlist.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
for channel in channels:
name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{name}\n{url}\n')
print("✅ Generated playlist.m3u")
return True
except Exception as e:
print(f"❌ M3U error: {e}")
return False
def main():
"""Main function."""
print("🎯 Enhanced IPTV Country Detection - 3-Point Analysis")
print("=" * 70)
print("🔍 Analyzes: Channel Name + EPG ID + Logo URL")
print("🎯 Filters: Only countries remain, streaming services → Uncategorized")
print("=" * 70)
channels = load_channels()
if not channels:
return False
# Enhanced reorganization
channels = reorganize_channels(channels)
# Sort: Countries first (alphabetically), then Uncategorized last
channels.sort(key=lambda x: (
"zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''),
x.get('Stream name', '')
))
# Save and generate
if not save_channels(channels):
return False
if not generate_m3u(channels):
return False
# Clear import
try:
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
print("🧹 Cleared import file")
except:
pass
print("\n🎉 ENHANCED PROCESSING COMPLETE!")
print("✅ 3-point analysis applied to all channels")
print("✅ Countries detected from EPG ID, Logo URL, and Channel Names")
print("✅ Streaming services filtered to Uncategorized")
print("✅ Clean country-organized playlist generated")
return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1), '', cleaned)
cleaned = re.sub(r'\s+group-title.*
def reorganize_channels(channels):
"""Enhanced reorganization with 3-point analysis."""
print("🔍 Enhanced Country Detection with 3-Point Analysis")
print("📊 Analyzing: Channel Name + EPG ID + Logo URL")
print("-" * 60)
changes = 0
stats = {
'country_detected': 0,
'sent_to_uncategorized': 0,
'kept_existing_country': 0
}
country_counts = {}
for channel in channels:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo = channel.get('Logo', '')
stream_url = channel.get('Stream URL', '')
# Detect country using enhanced 3-point analysis
detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url)
# Decide final group
if is_valid_country_group(old_group) and detected_country != "Uncategorized":
# Keep existing valid country
final_group = old_group
stats['kept_existing_country'] += 1
elif detected_country != "Uncategorized":
# Use detected country
final_group = detected_country
stats['country_detected'] += 1
if old_group != detected_country:
print(f"🔍 Fixed: '{stream_name}' {old_group}{detected_country}")
changes += 1
else:
# Send to Uncategorized
final_group = "Uncategorized"
stats['sent_to_uncategorized'] += 1
if old_group != "Uncategorized":
print(f"📱 Platform: '{stream_name}' → Uncategorized")
changes += 1
channel['Group'] = final_group
country_counts[final_group] = country_counts.get(final_group, 0) + 1
print(f"\n📊 PROCESSING RESULTS:")
print(f"✅ Changes made: {changes}")
print(f"🔍 Country detected: {stats['country_detected']}")
print(f"✅ Kept existing countries: {stats['kept_existing_country']}")
print(f"📱 Sent to Uncategorized: {stats['sent_to_uncategorized']}")
print(f"\n🌍 FINAL GROUP DISTRIBUTION:")
sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1]))
for country, count in sorted_countries:
print(f" {country}: {count} channels")
return channels
def save_channels(channels):
"""Save channels to file."""
# Backup
if os.path.exists('channels.txt'):
backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
shutil.copy2('channels.txt', backup)
print(f"📋 Backup: {backup}")
try:
with open('channels.txt', 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
print(f"✅ Saved {len(channels)} channels")
return True
except Exception as e:
print(f"❌ Save error: {e}")
return False
def generate_m3u(channels):
"""Generate M3U playlist."""
try:
with open('playlist.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
for channel in channels:
name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{name}\n{url}\n')
print("✅ Generated playlist.m3u")
return True
except Exception as e:
print(f"❌ M3U error: {e}")
return False
def main():
"""Main function."""
print("🎯 Enhanced IPTV Country Detection - 3-Point Analysis")
print("=" * 70)
print("🔍 Analyzes: Channel Name + EPG ID + Logo URL")
print("🎯 Filters: Only countries remain, streaming services → Uncategorized")
print("=" * 70)
channels = load_channels()
if not channels:
return False
# Enhanced reorganization
channels = reorganize_channels(channels)
# Sort: Countries first (alphabetically), then Uncategorized last
channels.sort(key=lambda x: (
"zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''),
x.get('Stream name', '')
))
# Save and generate
if not save_channels(channels):
return False
if not generate_m3u(channels):
return False
# Clear import
try:
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
print("🧹 Cleared import file")
except:
pass
print("\n🎉 ENHANCED PROCESSING COMPLETE!")
print("✅ 3-point analysis applied to all channels")
print("✅ Countries detected from EPG ID, Logo URL, and Channel Names")
print("✅ Streaming services filtered to Uncategorized")
print("✅ Clean country-organized playlist generated")
return True
if __name__ == "__main__":
success = main()
exit(0 if success else 1), '', cleaned)
return cleaned if cleaned and len(cleaned) > 1 else "Unknown Channel"
def extract_epg_from_malformed(raw_name):
"""Extract EPG ID from malformed data."""
# Look for domain patterns like .AB.ca, .ON.ca, etc.
domain_match = re.search(r'\.([A-Z]{2})\.ca', raw_name)
if domain_match:
province = domain_match.group(1)
return f"generic.{province}.ca"
# Look for .us domains
domain_match = re.search(r'\.([A-Z]{2})\.us', raw_name)
if domain_match:
state = domain_match.group(1)
return f"generic.{state}.us"
return ""
def load_channels():
"""Load channels from channels.txt with integrated data cleanup."""
if not os.path.exists('channels.txt'):
print("❌ No channels.txt found")
return []
try:
with open('channels.txt', 'r', encoding='utf-8') as f:
content = f.read()
channels = []
cleaned_count = 0
print("🧹 Step 1: Data Cleanup (fixing malformed entries)")
print("-" * 50)
for block in content.split('\n\n'):
if not block.strip():
continue
channel_data = {}
for line in block.strip().split('\n'):
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
if key == "Stream name":
# Check if this is malformed
if (value.startswith('".') or 'tvg-logo=' in value or
'group-title=' in value or value.count('"') > 2):
# Clean the malformed name
clean_name = clean_malformed_channel_name(value)
channel_data["Stream name"] = clean_name
# Extract EPG ID if missing
if not channel_data.get("EPG id"):
extracted_epg = extract_epg_from_malformed(value)
if extracted_epg:
channel_data["EPG id"] = extracted_epg
cleaned_count += 1
if cleaned_count <= 10: # Show first 10 examples
print(f"🔧 Fixed: '{value[:40]}...''{clean_name}'")
else:
channel_data[key] = value
else:
channel_data[key] = value
# Only add channels with valid names
if (channel_data.get('Stream name') and
len(channel_data.get('Stream name', '')) > 1 and
channel_data.get('Stream name') != "Unknown Channel"):
channels.append(channel_data)
print(f"✅ Data cleanup complete: {cleaned_count} entries fixed")
print(f"📊 Loaded {len(channels)} channels (after cleanup)")
return channels
except Exception as e:
print(f"❌ Error loading channels: {e}")
return []
def reorganize_channels(channels):
"""Enhanced reorganization with 3-point analysis."""
print("🔍 Enhanced Country Detection with 3-Point Analysis")
print("📊 Analyzing: Channel Name + EPG ID + Logo URL")
print("-" * 60)
changes = 0
stats = {
'country_detected': 0,
'sent_to_uncategorized': 0,
'kept_existing_country': 0
}
country_counts = {}
for channel in channels:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo = channel.get('Logo', '')
stream_url = channel.get('Stream URL', '')
# Detect country using enhanced 3-point analysis
detected_country = detect_country_from_channel_content(stream_name, epg_id, logo, stream_url)
# Decide final group
if is_valid_country_group(old_group) and detected_country != "Uncategorized":
# Keep existing valid country
final_group = old_group
stats['kept_existing_country'] += 1
elif detected_country != "Uncategorized":
# Use detected country
final_group = detected_country
stats['country_detected'] += 1
if old_group != detected_country:
print(f"🔍 Fixed: '{stream_name}' {old_group}{detected_country}")
changes += 1
else:
# Send to Uncategorized
final_group = "Uncategorized"
stats['sent_to_uncategorized'] += 1
if old_group != "Uncategorized":
print(f"📱 Platform: '{stream_name}' → Uncategorized")
changes += 1
channel['Group'] = final_group
country_counts[final_group] = country_counts.get(final_group, 0) + 1
print(f"\n📊 PROCESSING RESULTS:")
print(f"✅ Changes made: {changes}")
print(f"🔍 Country detected: {stats['country_detected']}")
print(f"✅ Kept existing countries: {stats['kept_existing_country']}")
print(f"📱 Sent to Uncategorized: {stats['sent_to_uncategorized']}")
print(f"\n🌍 FINAL GROUP DISTRIBUTION:")
sorted_countries = sorted(country_counts.items(), key=lambda x: (x[0] == "Uncategorized", -x[1]))
for country, count in sorted_countries:
print(f" {country}: {count} channels")
return channels
def save_channels(channels):
"""Save channels to file."""
# Backup
if os.path.exists('channels.txt'):
backup = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
shutil.copy2('channels.txt', backup)
print(f"📋 Backup: {backup}")
try:
with open('channels.txt', 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
print(f"✅ Saved {len(channels)} channels")
return True
except Exception as e:
print(f"❌ Save error: {e}")
return False
def generate_m3u(channels):
"""Generate M3U playlist."""
try:
with open('playlist.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
for channel in channels:
name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{name}\n{url}\n')
print("✅ Generated playlist.m3u")
return True
except Exception as e:
print(f"❌ M3U error: {e}")
return False
def main():
"""Main function."""
print("🎯 Enhanced IPTV Country Detection - 3-Point Analysis")
print("=" * 70)
print("🔍 Analyzes: Channel Name + EPG ID + Logo URL")
print("🎯 Filters: Only countries remain, streaming services → Uncategorized")
print("=" * 70)
channels = load_channels()
if not channels:
return False
# Enhanced reorganization
channels = reorganize_channels(channels)
# Sort: Countries first (alphabetically), then Uncategorized last
channels.sort(key=lambda x: (
"zzz" if x.get('Group') == "Uncategorized" else x.get('Group', ''),
x.get('Stream name', '')
))
# Save and generate
if not save_channels(channels):
return False
if not generate_m3u(channels):
return False
# Clear import
try:
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
print("🧹 Cleared import file")
except:
pass
print("\n🎉 ENHANCED PROCESSING COMPLETE!")
print("✅ 3-point analysis applied to all channels")
print("✅ Countries detected from EPG ID, Logo URL, and Channel Names")
print("✅ Streaming services filtered to Uncategorized")
print("✅ Clean country-organized playlist generated")
return True
if __name__ == "__main__": if __name__ == "__main__":
success = main() success = main()
exit(0 if success else 1) exit(0 if success else 1)