my-private-iptv-m3u/scripts/generate_playlist.py
stoney420 0614f34f86
Some checks failed
📺 Generate M3U Playlist / build (push) Failing after 1m38s
Update scripts/generate_playlist.py
2025-06-27 18:12:47 +02:00

597 lines
No EOL
20 KiB
Python

import re
import os
import json
from datetime import datetime
# --- Configuration ---
CHANNELS_FILE = 'channels.txt'
PLAYLIST_FILE = 'playlist.m3u'
IMPORT_FILE = 'bulk_import.m3u'
LOG_FILE = 'playlist_update.log'
SETTINGS_FILE = 'config/settings.json'
GROUP_OVERRIDES_FILE = 'config/group_overrides.json'
# Country detection patterns
COUNTRY_PATTERNS = {
# United Kingdom
"🇬🇧 United Kingdom": [
"uk", "united kingdom", "britain", "british", "england", "scotland", "wales",
"bbc", "itv", "sky", "channel 4", "channel5", "dave", "really", "yesterday",
"drama", "pick", "alibi", "eden", "gold", "w+1", "more4", "e4", "film4",
"quest", "discovery uk", "eurosport uk", "bt sport"
],
# United States
"🇺🇸 United States": [
"usa", "us", "united states", "america", "american",
"cnn", "fox news", "msnbc", "abc", "nbc", "cbs", "espn", "fox sports",
"disney", "nickelodeon", "cartoon network", "tnt", "tbs", "usa network",
"fx", "amc", "discovery", "history", "nat geo", "hgtv", "food network"
],
# Canada
"🇨🇦 Canada": [
"canada", "canadian", "cbc", "ctv", "global", "city tv", "tvo", "ici",
"sportsnet", "tsn", "rds", "aptn", "ztele", "canal d", "tele quebec"
],
# Australia
"🇦🇺 Australia": [
"australia", "australian", "aussie", "abc au", "sbs", "nine", "ten",
"seven", "foxtel", "optus sport", "kayo"
],
# Germany
"🇩🇪 Germany": [
"germany", "german", "deutschland", "ard", "zdf", "rtl", "sat.1", "pro7",
"vox", "kabel", "sport1", "eurosport de", "sky de"
],
# France
"🇫🇷 France": [
"france", "french", "tf1", "france 2", "france 3", "france 5", "m6",
"canal+", "bfm", "cnews", "rmc", "eurosport fr"
],
# Spain
"🇪🇸 Spain": [
"spain", "spanish", "españa", "tve", "antena 3", "cuatro", "telecinco",
"la sexta", "canal sur", "telemadrid", "movistar"
],
# Italy
"🇮🇹 Italy": [
"italy", "italian", "italia", "rai", "mediaset", "canale 5", "italia 1",
"rete 4", "la7", "sky italia"
],
# Netherlands
"🇳🇱 Netherlands": [
"netherlands", "dutch", "nederland", "npo", "rtl nl", "sbs nl", "veronica",
"net5", "rtl 4", "rtl 5", "rtl 7"
],
# Belgium
"🇧🇪 Belgium": [
"belgium", "belgian", "vtm", "een", "canvas", "ketnet", "rtbf", "la une"
],
# Portugal
"🇵🇹 Portugal": [
"portugal", "portuguese", "rtp", "sic", "tvi", "porto canal", "benfica tv"
],
# India
"🇮🇳 India": [
"india", "indian", "hindi", "bollywood", "zee", "star plus", "colors",
"sony", "dd national", "aaj tak", "ndtv", "times now"
],
# Brazil
"🇧🇷 Brazil": [
"brazil", "brazilian", "brasil", "globo", "sbt", "record", "band",
"rede tv", "cultura", "sportv"
],
# Mexico
"🇲🇽 Mexico": [
"mexico", "mexican", "televisa", "tv azteca", "canal 5", "las estrellas",
"canal once", "imagen"
],
# Arabic/Middle East
"🇸🇦 Arabic": [
"arabic", "arab", "al jazeera", "mbc", "dubai", "abu dhabi", "qatar",
"saudi", "kuwait", "lebanon", "syria", "iraq", "jordan"
],
# Turkey
"🇹🇷 Turkey": [
"turkey", "turkish", "trt", "atv", "kanal d", "star tv", "fox tr",
"show tv", "ntv"
],
# Russia
"🇷🇺 Russia": [
"russia", "russian", "rt", "channel one", "россия", "нтв", "тнт"
],
# Poland
"🇵🇱 Poland": [
"poland", "polish", "tvp", "polsat", "tvn", "tv4", "canal+ pl"
],
# Sweden
"🇸🇪 Sweden": [
"sweden", "swedish", "svt", "tv4", "kanal 5", "tv6", "tv8"
],
# Norway
"🇳🇴 Norway": [
"norway", "norwegian", "nrk", "tv2", "tvnorge", "max"
],
# Denmark
"🇩🇰 Denmark": [
"denmark", "danish", "dr", "tv2 dk", "kanal 5 dk", "6eren"
],
# Finland
"🇫🇮 Finland": [
"finland", "finnish", "yle", "mtv3", "nelonen", "sub"
],
# Greece
"🇬🇷 Greece": [
"greece", "greek", "ert", "mega", "ant1", "alpha", "skai"
],
# China
"🇨🇳 China": [
"china", "chinese", "cctv", "cgtn", "phoenix", "tvb", "中国", "中央"
],
# Japan
"🇯🇵 Japan": [
"japan", "japanese", "nhk", "fuji tv", "tbs", "tv asahi", "nippon tv"
],
# South Korea
"🇰🇷 South Korea": [
"korea", "korean", "kbs", "mbc", "sbs", "jtbc", "tvn"
],
# International/Global
"🌍 International": [
"international", "global", "world", "euro", "euronews", "dw",
"france 24", "cnn international", "bbc world", "sky news",
"bloomberg", "cnbc", "discovery", "national geographic",
"animal planet", "history", "travel", "mtv", "vh1", "nickelodeon"
]
}
def log_message(message, level="INFO"):
"""Logs messages to file and prints them."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted_message = f"[{timestamp}] {level}: {message}"
try:
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(formatted_message + "\n")
except Exception as e:
print(f"ERROR: Could not write to log: {e}")
print(formatted_message)
def load_settings():
"""Load settings with defaults."""
default_settings = {
"remove_duplicates": True,
"sort_channels": True,
"backup_before_import": True,
"auto_cleanup_import": True,
"auto_detect_country": True,
"normalize_country_names": True
}
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r', encoding='utf-8') as f:
settings = json.load(f)
return {**default_settings, **settings}
except Exception as e:
log_message(f"Could not load settings, using defaults: {e}", "WARNING")
return default_settings
def detect_country_from_text(text):
"""Detect country from channel name, group, or other text."""
text_lower = text.lower()
# Score each country based on keyword matches
country_scores = {}
for country, keywords in COUNTRY_PATTERNS.items():
score = 0
for keyword in keywords:
if keyword in text_lower:
# Give higher score for exact matches and longer keywords
score += len(keyword) * (2 if keyword == text_lower else 1)
if score > 0:
country_scores[country] = score
# Return country with highest score
if country_scores:
best_country = max(country_scores, key=country_scores.get)
return best_country, country_scores[best_country]
return None, 0
def smart_country_detection(channel):
"""Smart country detection using multiple sources."""
# Sources to check (in order of priority)
sources = [
("Stream name", channel.get('Stream name', '')),
("Group", channel.get('Group', '')),
("EPG id", channel.get('EPG id', '')),
("Logo", channel.get('Logo', ''))
]
best_country = None
best_score = 0
detection_source = None
for source_name, text in sources:
if text:
country, score = detect_country_from_text(text)
if country and score > best_score:
best_country = country
best_score = score
detection_source = source_name
# Log detection for debugging
if best_country:
log_message(f"Country detection: '{channel.get('Stream name', 'Unknown')}'{best_country} (from {detection_source}, score: {best_score})", "DEBUG")
else:
log_message(f"Country detection: Could not detect country for '{channel.get('Stream name', 'Unknown')}'", "DEBUG")
return best_country or "🌍 International"
def load_group_overrides():
"""Load manual group overrides."""
if os.path.exists(GROUP_OVERRIDES_FILE):
try:
with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
log_message(f"Could not load group overrides: {e}", "WARNING")
return {}
def apply_country_detection(channel, settings):
"""Apply country detection and overrides."""
original_group = channel.get('Group', 'Uncategorized')
# Check manual overrides first
group_overrides = load_group_overrides()
stream_name = channel.get('Stream name', '').lower()
for key, new_group in group_overrides.items():
if key.lower() in stream_name:
channel['Group'] = new_group
log_message(f"Manual override: '{channel.get('Stream name')}'{new_group}", "DEBUG")
return channel
# Auto-detect country if enabled
if settings.get('auto_detect_country', True):
detected_country = smart_country_detection(channel)
# Normalize existing country names if enabled
if settings.get('normalize_country_names', True):
channel['Group'] = detected_country
else:
# Only change if current group is not already a country
current_group_lower = original_group.lower()
is_already_country = any(
any(keyword in current_group_lower for keyword in keywords)
for keywords in COUNTRY_PATTERNS.values()
)
if not is_already_country:
channel['Group'] = detected_country
return channel
def parse_channel_block(block):
"""Parse a channel block from channels.txt."""
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
channel_data[key] = value
return channel_data
def parse_m3u_entry(extinf_line, url_line):
"""Parse M3U entry."""
channel = {}
# Extract attributes
tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line)
tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line)
group_title_match = re.search(r'group-title="([^"]*)"', extinf_line)
tvg_name_match = re.search(r'tvg-name="([^"]*)"', extinf_line)
channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else ''
channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else ''
channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized'
channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else ''
# Stream name after the last comma
stream_name_match = re.search(r',(.+)$', extinf_line)
channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel'
channel['Stream URL'] = url_line.strip()
return channel
def convert_to_channels_txt_block(channel_data):
"""Convert to channels.txt format."""
block = []
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
block.append(f"Logo = {channel_data.get('Logo', '')}")
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
return "\n".join(block)
def get_channel_signature(channel):
"""Create unique signature for duplicate detection."""
stream_name = channel.get('Stream name', '').strip().lower()
stream_url = channel.get('Stream URL', '').strip().lower()
# Clean name
stream_name_clean = re.sub(r'\s+', ' ', stream_name)
stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean)
return f"{stream_name_clean}|{stream_url}"
def remove_duplicates(channels, settings):
"""Remove duplicate channels."""
if not settings.get('remove_duplicates', True):
log_message("Duplicate removal disabled", "INFO")
return channels
seen_signatures = set()
unique_channels = []
duplicate_count = 0
for channel in channels:
signature = get_channel_signature(channel)
if signature not in seen_signatures:
seen_signatures.add(signature)
unique_channels.append(channel)
else:
duplicate_count += 1
log_message(f"Duplicate removed: {channel.get('Stream name', 'Unknown')}", "DEBUG")
if duplicate_count > 0:
log_message(f"Removed {duplicate_count} duplicate channels", "INFO")
else:
log_message("No duplicates found", "INFO")
return unique_channels
def process_import():
"""Process bulk import file."""
settings = load_settings()
if not os.path.exists(IMPORT_FILE):
log_message(f"No {IMPORT_FILE} found, skipping import", "INFO")
return []
log_message(f"Processing {IMPORT_FILE}...", "INFO")
imported_channels = []
try:
with open(IMPORT_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
log_message(f"Found {len(lines)} lines in import file", "INFO")
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith('#EXTINF:'):
if i + 1 < len(lines):
extinf_line = line
url_line = lines[i+1].strip()
if not url_line or url_line.startswith('#'):
i += 1
continue
channel_data = parse_m3u_entry(extinf_line, url_line)
channel_data = apply_country_detection(channel_data, settings)
if channel_data.get('Stream name') and channel_data.get('Stream URL'):
imported_channels.append(channel_data)
i += 2
else:
i += 1
else:
i += 1
log_message(f"Parsed {len(imported_channels)} channels from import", "INFO")
# Remove duplicates from import
if imported_channels:
imported_channels = remove_duplicates(imported_channels, settings)
# Check existing channels
existing_channels = []
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
channel_blocks = re.split(r'\n\s*\n+', content.strip())
for block in channel_blocks:
if block.strip():
existing_channels.append(parse_channel_block(block))
existing_signatures = {get_channel_signature(ch) for ch in existing_channels}
new_channels = []
for channel in imported_channels:
if get_channel_signature(channel) not in existing_signatures:
new_channels.append(channel)
imported_channels = new_channels
log_message(f"Final import: {len(imported_channels)} new channels", "INFO")
# Write to channels.txt
if imported_channels:
lines_before = 0
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
lines_before = len(f.readlines())
with open(CHANNELS_FILE, 'a', encoding='utf-8') as f:
for i, channel in enumerate(imported_channels):
if i > 0 or lines_before > 0:
f.write("\n\n")
block_content = convert_to_channels_txt_block(channel)
f.write(block_content)
log_message(f"Successfully imported {len(imported_channels)} channels", "INFO")
else:
log_message("No new channels to import", "INFO")
except Exception as e:
log_message(f"Error processing import: {e}", "ERROR")
return imported_channels
# Clean up import file
if settings.get('auto_cleanup_import', True):
try:
os.remove(IMPORT_FILE)
log_message(f"Cleaned up {IMPORT_FILE}", "INFO")
except Exception as e:
log_message(f"Could not remove {IMPORT_FILE}: {e}", "WARNING")
return imported_channels
def generate_playlist():
"""Main function."""
# Clear log
if os.path.exists(LOG_FILE):
open(LOG_FILE, 'w').close()
log_message("Starting playlist generation with smart country detection...", "INFO")
settings = load_settings()
# Process import
imported_channels = process_import()
log_message(f"Import returned {len(imported_channels)} channels", "INFO")
# Read channels.txt
if not os.path.exists(CHANNELS_FILE):
log_message(f"Error: {CHANNELS_FILE} not found", "ERROR")
return
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
# Parse channels
channel_blocks = re.split(r'\n\s*\n+', content.strip())
parsed_channels = []
for block in channel_blocks:
if block.strip():
channel = parse_channel_block(block)
if channel:
# Apply country detection to existing channels too
channel = apply_country_detection(channel, settings)
parsed_channels.append(channel)
log_message(f"Parsed {len(parsed_channels)} channels", "INFO")
# Remove duplicates
parsed_channels = remove_duplicates(parsed_channels, settings)
# Sort channels by country then name
if settings.get('sort_channels', True):
parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
log_message("Channels sorted by country and name", "INFO")
# Log country distribution
country_counts = {}
for channel in parsed_channels:
country = channel.get('Group', 'Unknown')
country_counts[country] = country_counts.get(country, 0) + 1
log_message("Country distribution:", "INFO")
for country, count in sorted(country_counts.items()):
log_message(f" {country}: {count} channels", "INFO")
# Build M3U
m3u_lines = ["#EXTM3U"]
valid_channels = 0
for channel in parsed_channels:
stream_name = channel.get('Stream name', '')
group_name = channel.get('Group', 'Uncategorized')
logo_url = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
stream_url = channel.get('Stream URL', '')
if not stream_name or not stream_url:
continue
extinf_attrs = [
f'tvg-id="{epg_id}"',
f'tvg-logo="{logo_url}"',
f'group-title="{group_name}"',
f'tvg-name="{stream_name}"'
]
extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}"
m3u_lines.append(extinf_line)
m3u_lines.append(stream_url)
valid_channels += 1
# Write M3U
try:
with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f:
for line in m3u_lines:
f.write(line + '\n')
log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO")
except Exception as e:
log_message(f"Error writing playlist: {e}", "ERROR")
# Update channels.txt with new country assignments
if settings.get('normalize_country_names', True):
try:
with open(CHANNELS_FILE, 'w', encoding='utf-8') as f:
for i, channel in enumerate(parsed_channels):
if i > 0:
f.write("\n\n")
block_content = convert_to_channels_txt_block(channel)
f.write(block_content)
log_message("Updated channels.txt with normalized country names", "INFO")
except Exception as e:
log_message(f"Error updating channels.txt: {e}", "ERROR")
log_message("Playlist generation complete", "INFO")
if __name__ == "__main__":
generate_playlist()