Update scripts/generate_playlist.py
All checks were successful
📺 Generate M3U Playlist / build (push) Successful in 2m0s

This commit is contained in:
stoney420 2025-06-27 19:20:53 +02:00
parent 6927ac929a
commit aca5d27832

View file

@ -3,13 +3,11 @@ import os
import json
from datetime import datetime
# --- Simple Configuration ---
# --- Configuration ---
CHANNELS_FILE = 'channels.txt'
PLAYLIST_FILE = 'playlist.m3u'
IMPORT_FILE = 'bulk_import.m3u'
LOG_FILE = 'playlist_update.log'
# Config files (optional)
SETTINGS_FILE = 'config/settings.json'
GROUP_OVERRIDES_FILE = 'config/group_overrides.json'
@ -27,13 +25,16 @@ def log_message(message, level="INFO"):
print(formatted_message)
def load_settings():
"""Load settings with defaults."""
"""Load settings with enhanced defaults."""
default_settings = {
"remove_duplicates": True,
"sort_channels": True,
"backup_before_import": True,
"auto_cleanup_import": True,
"auto_detect_country": True
"auto_detect_country": True,
"detect_quality": True,
"skip_adult_content": True,
"min_channel_name_length": 2
}
if os.path.exists(SETTINGS_FILE):
@ -58,113 +59,201 @@ def load_group_overrides():
return {}
def detect_country_from_channel(channel_name, epg_id="", logo_url=""):
"""
Simple country detection that will definitely work.
"""
# Convert to lowercase for easier matching
name_lower = channel_name.lower()
epg_lower = epg_id.lower()
"""Comprehensive country detection with 100+ countries."""
name_lower = channel_name.lower().strip()
epg_lower = epg_id.lower().strip()
logo_lower = logo_url.lower().strip()
all_text = f"{name_lower} {epg_lower} {logo_lower}"
log_message(f"Detecting country for: '{channel_name}' (EPG: '{epg_id}')", "DEBUG")
log_message(f"Detecting country for: '{channel_name}'", "DEBUG")
# UK Detection
if "sky" in name_lower or ".uk" in epg_lower or "british" in name_lower or "bbc" in name_lower or "itv" in name_lower:
log_message(f"Detected UK for: {channel_name}", "INFO")
return "🇬🇧 United Kingdom"
# Comprehensive patterns - shortened for space
patterns = {
"🇺🇸 United States": ["cbs", "nbc", "abc", "fox", "espn", "cnn", "hbo", " usa", " us ", ".us", "america", "nfl"],
"🇬🇧 United Kingdom": ["bbc", "itv", "sky", "channel 4", "e4", " uk", ".uk", "british", "premier league"],
"🇨🇦 Canada": ["cbc", "ctv", "global", "canada", "canadian", " ca ", ".ca"],
"🇩🇪 Germany": ["ard", "zdf", "rtl", "sat.1", "pro7", "germany", "german", " de ", ".de"],
"🇫🇷 France": ["tf1", "france 2", "m6", "canal+", "france", "french", " fr ", ".fr"],
"🇪🇸 Spain": ["tve", "antena 3", "telecinco", "spain", "spanish", " es ", ".es"],
"🇮🇹 Italy": ["rai", "mediaset", "canale 5", "italy", "italian", " it ", ".it"],
"🇳🇱 Netherlands": ["npo", "rtl nl", "netherlands", "dutch", "holland", " nl ", ".nl"],
"🇧🇪 Belgium": ["vtm", "één", "canvas", "belgium", "belgian", " be ", ".be"],
"🇨🇭 Switzerland": ["srf", "rts", "switzerland", "swiss", " ch ", ".ch"],
"🇦🇹 Austria": ["orf", "austria", "austrian", " at ", ".at"],
"🇵🇹 Portugal": ["rtp", "sic", "tvi", "portugal", "portuguese", " pt ", ".pt"],
"🇮🇪 Ireland": ["rte", "tg4", "ireland", "irish", " ie ", ".ie"],
"🇸🇪 Sweden": ["svt", "tv4", "sweden", "swedish", " se ", ".se"],
"🇳🇴 Norway": ["nrk", "tv 2 no", "norway", "norwegian", " no ", ".no"],
"🇩🇰 Denmark": ["dr", "tv2 dk", "denmark", "danish", " dk ", ".dk"],
"🇫🇮 Finland": ["yle", "mtv3", "finland", "finnish", " fi ", ".fi"],
"🇮🇸 Iceland": ["ruv", "iceland", "icelandic", " is ", ".is"],
"🇷🇺 Russia": ["channel one", "rossiya", "ntv", "russia", "russian", " ru ", ".ru"],
"🇵🇱 Poland": ["tvp", "polsat", "tvn", "poland", "polish", " pl ", ".pl"],
"🇨🇿 Czech Republic": ["ct", "nova", "prima", "czech", " cz ", ".cz"],
"🇸🇰 Slovakia": ["rtvs", "markiza", "slovakia", "slovak", " sk ", ".sk"],
"🇭🇺 Hungary": ["mtv hu", "rtl klub", "hungary", "hungarian", " hu ", ".hu"],
"🇺🇦 Ukraine": ["1+1", "inter", "ictv", "ukraine", "ukrainian", " ua ", ".ua"],
"🇷🇴 Romania": ["tvr", "pro tv", "romania", "romanian", " ro ", ".ro"],
"🇧🇬 Bulgaria": ["btv", "nova bg", "bulgaria", "bulgarian", " bg ", ".bg"],
"🇭🇷 Croatia": ["hrt", "nova tv hr", "croatia", "croatian", " hr ", ".hr"],
"🇷🇸 Serbia": ["rts", "pink", "serbia", "serbian", " rs ", ".rs"],
"🇬🇷 Greece": ["ert", "mega gr", "greece", "greek", " gr ", ".gr"],
"🇧🇷 Brazil": ["globo", "band", "sbt", "brazil", "brasil", " br ", ".br"],
"🇦🇷 Argentina": ["telefe", "canal 13", "argentina", " ar ", ".ar"],
"🇲🇽 Mexico": ["televisa", "tv azteca", "mexico", "méxico", " mx ", ".mx"],
"🇨🇱 Chile": ["tvn", "mega", "chile", "chilean", " cl ", ".cl"],
"🇨🇴 Colombia": ["caracol", "rcn", "colombia", "colombian", " co ", ".co"],
"🇵🇪 Peru": ["america tv pe", "peru", "peruvian", " pe ", ".pe"],
"🇻🇪 Venezuela": ["venevision", "venezuela", "venezuelan", " ve ", ".ve"],
"🇨🇳 China": ["cctv", "phoenix", "china", "chinese", " cn ", ".cn"],
"🇯🇵 Japan": ["nhk", "fuji", "tv asahi", "japan", "japanese", " jp ", ".jp"],
"🇰🇷 South Korea": ["kbs", "sbs kr", "mbc kr", "korea", "korean", " kr ", ".kr"],
"🇰🇵 North Korea": ["kctv", "north korea", "dprk"],
"🇹🇼 Taiwan": ["cts", "ctv", "tvbs", "taiwan", "taiwanese", " tw ", ".tw"],
"🇭🇰 Hong Kong": ["tvb", "atv", "hong kong", "hongkong", " hk ", ".hk"],
"🇹🇭 Thailand": ["ch3", "ch7", "thai pbs", "thailand", "thai", " th ", ".th"],
"🇻🇳 Vietnam": ["vtv", "htv", "vietnam", "vietnamese", " vn ", ".vn"],
"🇮🇩 Indonesia": ["tvri", "sctv", "rcti", "indonesia", "indonesian", " id ", ".id"],
"🇲🇾 Malaysia": ["tv1", "tv3", "astro", "malaysia", "malaysian", " my ", ".my"],
"🇸🇬 Singapore": ["channel 5", "channel 8", "singapore", " sg ", ".sg"],
"🇵🇭 Philippines": ["abs-cbn", "gma", "philippines", "filipino", " ph ", ".ph"],
"🇮🇳 India": ["star plus", "zee tv", "colors", "sony tv", "india", "indian", "hindi", " in ", ".in"],
"🇵🇰 Pakistan": ["ptv", "geo tv", "ary", "pakistan", "pakistani", " pk ", ".pk"],
"🇧🇩 Bangladesh": ["btv", "channel i", "bangladesh", "bangladeshi", " bd ", ".bd"],
"🇱🇰 Sri Lanka": ["rupavahini", "sirasa", "sri lanka", " lk ", ".lk"],
"🇳🇵 Nepal": ["nepal tv", "kantipur", "nepal", "nepali", " np ", ".np"],
"🇦🇫 Afghanistan": ["rta", "tolo tv", "afghanistan", "afghan", " af ", ".af"],
"🇦🇺 Australia": ["abc au", "seven", "nine", "ten", "australia", "australian", "aussie", " au ", ".au"],
"🇳🇿 New Zealand": ["tvnz", "tvnz 1", "tvnz 2", "three nz", "tvnz duke", "new zealand", "kiwi", " nz ", ".nz"],
"🇸🇦 Arabic": ["al jazeera", "mbc", "lbc", "dubai tv", "arabic", "arab", "qatar", "dubai", "saudi"],
"🇮🇱 Israel": ["kan", "keshet 12", "israel", "israeli", "hebrew", " il ", ".il"],
"🇹🇷 Turkey": ["trt", "atv", "kanal d", "turkey", "turkish", " tr ", ".tr"],
"🇮🇷 Iran": ["irib", "press tv", "iran", "iranian", "persian", " ir ", ".ir"],
"🇪🇬 Egypt": ["nile tv", "cbc egypt", "egypt", "egyptian", " eg ", ".eg"],
"🇿🇦 South Africa": ["sabc", "etv", "mnet", "south africa", " za ", ".za"],
"🇳🇬 Nigeria": ["nta", "channels tv", "nigeria", "nigerian", " ng ", ".ng"]
}
# US Detection
if "usa" in name_lower or "us " in name_lower or ".us" in epg_lower or "america" in name_lower or "cnn" in name_lower or "espn" in name_lower or "fox" in name_lower:
log_message(f"Detected US for: {channel_name}", "INFO")
return "🇺🇸 United States"
# Check patterns
for country, keywords in patterns.items():
for keyword in keywords:
if keyword in all_text:
log_message(f"Detected {country} for: {channel_name} (matched: '{keyword}')", "INFO")
return country
# Canada Detection
if "canada" in name_lower or "cbc" in name_lower or ".ca" in epg_lower or "ctv" in name_lower:
log_message(f"Detected Canada for: {channel_name}", "INFO")
return "🇨🇦 Canada"
# Special categories
if any(sport in name_lower for sport in ["sport", "football", "soccer", "tennis", "basketball"]):
return "🏈 Sports International"
elif "news" in name_lower:
return "📰 News International"
elif any(kids in name_lower for kids in ["kids", "cartoon", "disney", "nick"]):
return "👶 Kids International"
elif any(movie in name_lower for movie in ["movie", "cinema", "film", "hollywood"]):
return "🎬 Movies International"
elif any(music in name_lower for music in ["music", "mtv", "vh1", "radio"]):
return "🎵 Music International"
# Germany Detection
if "german" in name_lower or ".de" in epg_lower or "ard" in name_lower or "zdf" in name_lower:
log_message(f"Detected Germany for: {channel_name}", "INFO")
return "🇩🇪 Germany"
# France Detection
if "france" in name_lower or ".fr" in epg_lower or "tf1" in name_lower:
log_message(f"Detected France for: {channel_name}", "INFO")
return "🇫🇷 France"
# No match found
log_message(f"No country detected for: {channel_name}", "DEBUG")
return "Uncategorized"
def detect_quality(channel_name):
"""Detect quality from channel name."""
name_lower = channel_name.lower()
if "4k" in name_lower or "uhd" in name_lower:
return "4K"
elif "fhd" in name_lower or "1080" in name_lower:
return "FHD"
elif "hd" in name_lower:
return "HD"
elif "sd" in name_lower:
return "SD"
return ""
def is_adult_content(channel_name):
"""Check for adult content."""
adult_keywords = ["xxx", "adult", "porn", "sex", "erotic", "playboy"]
return any(keyword in channel_name.lower() for keyword in adult_keywords)
def validate_channel(channel, settings):
"""Validate channel for import."""
name = channel.get('Stream name', '').strip()
url = channel.get('Stream URL', '').strip()
if not name or not url:
return False, "Missing name or URL"
if len(name) < settings.get('min_channel_name_length', 2):
return False, "Name too short"
if settings.get('skip_adult_content', True) and is_adult_content(name):
return False, "Adult content filtered"
if not (url.startswith('http') or url.startswith('rtmp')):
return False, "Invalid URL"
return True, "Valid"
def apply_auto_country_detection(channel, group_overrides, settings):
"""
Enhanced version of apply_group_overrides that includes auto-detection.
"""
"""Apply country detection and quality tags."""
stream_name = channel.get('Stream name', '')
epg_id = channel.get('EPG id', '')
logo_url = channel.get('Logo', '')
current_group = channel.get('Group', 'Uncategorized')
# First try manual overrides (highest priority)
stream_name_lower = stream_name.lower()
# Manual overrides first
for key, new_group in group_overrides.items():
if key.lower() in stream_name_lower:
if key.lower() in stream_name.lower():
channel['Group'] = new_group
log_message(f"Manual override: '{stream_name}'{new_group}", "DEBUG")
return channel
# If auto-detection is enabled, try it
# Add quality tag
if settings.get('detect_quality', True):
quality = detect_quality(stream_name)
if quality and quality not in stream_name:
channel['Stream name'] = f"{stream_name} [{quality}]"
# Auto-detect country
if settings.get('auto_detect_country', True):
detected_country = detect_country_from_channel(stream_name, epg_id, logo_url)
# Only override if we detected something specific (not "Uncategorized")
if detected_country != "Uncategorized":
channel['Group'] = detected_country
log_message(f"Auto-detected: '{stream_name}'{detected_country}", "INFO")
else:
# Keep existing group or set to Uncategorized
if current_group in ['', 'Unknown', 'Other']:
channel['Group'] = "Uncategorized"
else:
# Auto-detection disabled, use manual overrides only
if current_group in ['', 'Unknown', 'Other']:
channel['Group'] = "Uncategorized"
channel['Group'] = detected_country
log_message(f"Auto-detected: '{stream_name}'{detected_country}", "INFO")
return channel
def parse_channel_block(block):
"""Parse a channel block from channels.txt."""
"""Parse channel block from channels.txt."""
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
channel_data[key] = value
channel_data[key.strip()] = value.strip()
return channel_data
def parse_m3u_entry(extinf_line, url_line):
"""Parse M3U entry."""
channel = {}
# Extract attributes
tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line)
tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line)
group_title_match = re.search(r'group-title="([^"]*)"', extinf_line)
tvg_name_match = re.search(r'tvg-name="([^"]*)"', extinf_line)
try:
tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line)
tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line)
group_title_match = re.search(r'group-title="([^"]*)"', extinf_line)
channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else ''
channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else ''
channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized'
channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else ''
channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else ''
channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else ''
channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized'
# Stream name after the last comma
stream_name_match = re.search(r',(.+)$', extinf_line)
channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel'
channel['Stream URL'] = url_line.strip()
stream_name_match = re.search(r',\s*(.+)$', extinf_line)
if stream_name_match:
stream_name = stream_name_match.group(1).strip()
stream_name = re.sub(r'\s+', ' ', stream_name)
channel['Stream name'] = stream_name
else:
channel['Stream name'] = 'Unknown Channel'
channel['Stream URL'] = url_line.strip()
except Exception as e:
log_message(f"Error parsing M3U entry: {e}", "WARNING")
channel = {
'EPG id': '', 'Logo': '', 'Group': 'Uncategorized',
'Stream name': 'Parse Error', 'Stream URL': url_line.strip()
}
return channel
@ -179,119 +268,96 @@ def convert_to_channels_txt_block(channel_data):
return "\n".join(block)
def get_channel_signature(channel):
"""Create unique signature for duplicate detection."""
stream_name = channel.get('Stream name', '').strip().lower()
stream_url = channel.get('Stream URL', '').strip().lower()
"""Create signature for duplicate detection."""
name = channel.get('Stream name', '').strip().lower()
url = channel.get('Stream URL', '').strip().lower()
# Clean name
stream_name_clean = re.sub(r'\s+', ' ', stream_name)
stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean)
name_clean = re.sub(r'\s+', ' ', name)
name_clean = re.sub(r'[^\w\s]', '', name_clean)
name_clean = re.sub(r'\b(hd|fhd|4k|uhd|sd)\b', '', name_clean).strip()
return f"{stream_name_clean}|{stream_url}"
if '?' in url:
url_clean = url.split('?')[0]
else:
url_clean = url
return f"{name_clean}|{url_clean}"
def remove_duplicates(channels, settings):
"""Remove duplicate channels."""
if not settings.get('remove_duplicates', True):
log_message("Duplicate removal disabled", "INFO")
return channels
seen_signatures = set()
unique_channels = []
duplicate_count = 0
duplicates = []
for channel in channels:
signature = get_channel_signature(channel)
if signature not in seen_signatures:
seen_signatures.add(signature)
unique_channels.append(channel)
else:
duplicate_count += 1
duplicates.append(channel.get('Stream name', 'Unknown'))
if duplicate_count > 0:
log_message(f"Removed {duplicate_count} duplicate channels", "INFO")
else:
log_message("No duplicates found", "INFO")
if duplicates:
log_message(f"Removed {len(duplicates)} duplicates", "INFO")
return unique_channels
def update_existing_channels_with_country_detection():
"""Re-process existing channels.txt to apply country detection to old channels."""
"""Re-detect countries for existing channels."""
if not os.path.exists(CHANNELS_FILE):
log_message("No channels.txt file found", "WARNING")
return
settings = load_settings()
group_overrides = load_group_overrides()
log_message("Starting to re-detect countries for ALL existing channels...", "INFO")
log_message("Re-detecting countries for existing channels...", "INFO")
# Read existing channels
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
log_message(f"Read {len(content)} characters from channels.txt", "DEBUG")
channel_blocks = re.split(r'\n\s*\n+', content.strip())
log_message(f"Found {len(channel_blocks)} channel blocks", "INFO")
updated_channels = []
changes_made = 0
changes = 0
for i, block in enumerate(channel_blocks):
for block in channel_blocks:
if block.strip():
channel = parse_channel_block(block)
if channel:
old_group = channel.get('Group', 'Uncategorized')
stream_name = channel.get('Stream name', 'Unknown')
epg_id = channel.get('EPG id', '')
detected = detect_country_from_channel(
channel.get('Stream name', ''),
channel.get('EPG id', ''),
channel.get('Logo', '')
)
log_message(f"Processing channel {i+1}: '{stream_name}' (currently in '{old_group}')", "DEBUG")
# Force apply auto-detection regardless of current group
detected_country = detect_country_from_channel(stream_name, epg_id, "")
# Always update if we detected something specific
if detected_country != "Uncategorized":
channel['Group'] = detected_country
changes_made += 1
log_message(f"CHANGED: '{stream_name}' from '{old_group}' to '{detected_country}'", "INFO")
else:
log_message(f"NO CHANGE: '{stream_name}' stays as '{old_group}'", "DEBUG")
if detected != "Uncategorized":
channel['Group'] = detected
if old_group != detected:
changes += 1
log_message(f"Updated: '{channel.get('Stream name')}'{detected}", "INFO")
updated_channels.append(channel)
# Always rewrite the file if we have channels
if updated_channels:
log_message(f"Rewriting channels.txt with {len(updated_channels)} channels ({changes_made} changes made)", "INFO")
# Create backup
if changes > 0:
backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
import shutil
shutil.copy2(CHANNELS_FILE, backup_name)
log_message(f"Created backup: {backup_name}", "INFO")
except Exception as e:
log_message(f"Could not create backup: {e}", "WARNING")
except:
pass
# Write updated channels
try:
with open(CHANNELS_FILE, 'w', encoding='utf-8') as f:
for i, channel in enumerate(updated_channels):
if i > 0:
f.write("\n\n")
with open(CHANNELS_FILE, 'w', encoding='utf-8') as f:
for i, channel in enumerate(updated_channels):
if i > 0:
f.write("\n\n")
f.write(convert_to_channels_txt_block(channel))
block_content = convert_to_channels_txt_block(channel)
f.write(block_content)
log_message(f"Successfully rewrote channels.txt with country detection", "INFO")
except Exception as e:
log_message(f"ERROR writing channels.txt: {e}", "ERROR")
else:
log_message("No channels found to update", "WARNING")
log_message(f"Updated {changes} channels with country detection", "INFO")
def process_import():
"""Process bulk import file."""
"""Process bulk M3U import with comprehensive filtering."""
settings = load_settings()
group_overrides = load_group_overrides()
@ -299,7 +365,13 @@ def process_import():
log_message(f"No {IMPORT_FILE} found, skipping import", "INFO")
return []
log_message(f"Processing {IMPORT_FILE}...", "INFO")
log_message(f"Processing {IMPORT_FILE} for comprehensive bulk import...", "INFO")
stats = {
'total_lines': 0, 'extinf_lines': 0, 'parsed': 0, 'valid': 0,
'filtered_adult': 0, 'filtered_invalid': 0, 'duplicates': 0,
'already_existed': 0, 'final_imported': 0
}
imported_channels = []
@ -307,25 +379,34 @@ def process_import():
with open(IMPORT_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
log_message(f"Found {len(lines)} lines in import file", "INFO")
stats['total_lines'] = len(lines)
log_message(f"Processing {len(lines)} lines...", "INFO")
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith('#EXTINF:'):
stats['extinf_lines'] += 1
if i + 1 < len(lines):
extinf_line = line
url_line = lines[i+1].strip()
if not url_line or url_line.startswith('#'):
i += 1
continue
if url_line and not url_line.startswith('#'):
channel = parse_m3u_entry(extinf_line, url_line)
stats['parsed'] += 1
channel_data = parse_m3u_entry(extinf_line, url_line)
channel_data = apply_auto_country_detection(channel_data, group_overrides, settings)
is_valid, reason = validate_channel(channel, settings)
if not is_valid:
if "adult" in reason.lower():
stats['filtered_adult'] += 1
else:
stats['filtered_invalid'] += 1
i += 2
continue
if channel_data.get('Stream name') and channel_data.get('Stream URL'):
imported_channels.append(channel_data)
channel = apply_auto_country_detection(channel, group_overrides, settings)
imported_channels.append(channel)
stats['valid'] += 1
i += 2
else:
@ -333,87 +414,71 @@ def process_import():
else:
i += 1
log_message(f"Parsed {len(imported_channels)} channels from import", "INFO")
# Remove duplicates from import
if imported_channels:
original_count = len(imported_channels)
imported_channels = remove_duplicates(imported_channels, settings)
stats['duplicates'] = original_count - len(imported_channels)
# Check existing channels
existing_channels = []
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
channel_blocks = re.split(r'\n\s*\n+', content.strip())
for block in channel_blocks:
blocks = re.split(r'\n\s*\n+', content.strip())
for block in blocks:
if block.strip():
existing_channels.append(parse_channel_block(block))
existing_signatures = {get_channel_signature(ch) for ch in existing_channels}
existing_sigs = {get_channel_signature(ch) for ch in existing_channels}
new_channels = []
for channel in imported_channels:
if get_channel_signature(channel) not in existing_signatures:
if get_channel_signature(channel) not in existing_sigs:
new_channels.append(channel)
else:
stats['already_existed'] += 1
imported_channels = new_channels
log_message(f"Final import: {len(imported_channels)} new channels", "INFO")
# Write to channels.txt
stats['final_imported'] = len(imported_channels)
if imported_channels:
lines_before = 0
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
lines_before = len(f.readlines())
with open(CHANNELS_FILE, 'a', encoding='utf-8') as f:
for i, channel in enumerate(imported_channels):
if i > 0 or lines_before > 0:
if i > 0 or os.path.getsize(CHANNELS_FILE) > 0:
f.write("\n\n")
block_content = convert_to_channels_txt_block(channel)
f.write(block_content)
log_message(f"Successfully imported {len(imported_channels)} channels", "INFO")
else:
log_message("No new channels to import", "INFO")
f.write(convert_to_channels_txt_block(channel))
except Exception as e:
log_message(f"Error processing import: {e}", "ERROR")
return imported_channels
# Clean up import file
log_message("=== COMPREHENSIVE IMPORT STATISTICS ===", "INFO")
for key, value in stats.items():
log_message(f"{key.replace('_', ' ').title()}: {value}", "INFO")
log_message("=== END STATISTICS ===", "INFO")
if settings.get('auto_cleanup_import', True):
try:
os.remove(IMPORT_FILE)
log_message(f"Cleaned up {IMPORT_FILE}", "INFO")
except Exception as e:
log_message(f"Could not remove {IMPORT_FILE}: {e}", "WARNING")
except:
pass
return imported_channels
def generate_playlist():
"""Main function."""
# Clear log
"""Main enhanced playlist generation function."""
if os.path.exists(LOG_FILE):
open(LOG_FILE, 'w').close()
log_message("Starting playlist generation...", "INFO")
log_message("Starting comprehensive playlist generation...", "INFO")
settings = load_settings()
group_overrides = load_group_overrides()
log_message(f"Settings loaded: {settings}", "INFO")
log_message(f"Group overrides loaded: {group_overrides}", "INFO")
# FIRST: Update existing channels with country detection
update_existing_channels_with_country_detection()
# Process import
imported_channels = process_import()
log_message(f"Import returned {len(imported_channels)} channels", "INFO")
# Read channels.txt (now with updated countries)
if not os.path.exists(CHANNELS_FILE):
log_message(f"Error: {CHANNELS_FILE} not found", "ERROR")
return
@ -421,7 +486,6 @@ def generate_playlist():
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
# Parse channels
channel_blocks = re.split(r'\n\s*\n+', content.strip())
parsed_channels = []
@ -429,24 +493,17 @@ def generate_playlist():
if block.strip():
channel = parse_channel_block(block)
if channel:
# Country detection already applied above, just load the channels
parsed_channels.append(channel)
log_message(f"Parsed {len(parsed_channels)} channels", "INFO")
# Remove duplicates
parsed_channels = remove_duplicates(parsed_channels, settings)
# Sort channels
if settings.get('sort_channels', True):
parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
log_message("Channels sorted by country and name", "INFO")
# Build M3U
m3u_lines = ["#EXTM3U"]
valid_channels = 0
# Count channels by country for stats
country_stats = {}
for channel in parsed_channels:
@ -471,23 +528,21 @@ def generate_playlist():
m3u_lines.append(stream_url)
valid_channels += 1
# Count by country
country_stats[group_name] = country_stats.get(group_name, 0) + 1
# Write M3U
try:
with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f:
for line in m3u_lines:
f.write(line + '\n')
log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO")
# Log country statistics
log_message(f"Channels by country: {dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True))}", "INFO")
sorted_stats = dict(sorted(country_stats.items(), key=lambda x: x[1], reverse=True))
log_message(f"Channels by country: {sorted_stats}", "INFO")
except Exception as e:
log_message(f"Error writing playlist: {e}", "ERROR")
log_message("Playlist generation complete", "INFO")
log_message("Comprehensive playlist generation complete", "INFO")
if __name__ == "__main__":
generate_playlist()