my-private-iptv-m3u/scripts/generate_playlist.py

336 lines
11 KiB
Python
Raw Normal View History

2025-06-27 16:34:52 +02:00
import re
import os
2025-06-27 17:36:03 +02:00
import json
from datetime import datetime
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# --- Simple Configuration ---
2025-06-27 16:34:52 +02:00
CHANNELS_FILE = 'channels.txt'
PLAYLIST_FILE = 'playlist.m3u'
IMPORT_FILE = 'bulk_import.m3u'
2025-06-27 18:00:43 +02:00
LOG_FILE = 'playlist_update.log'
2025-06-27 17:36:03 +02:00
2025-06-27 18:00:43 +02:00
# Config files (optional)
SETTINGS_FILE = 'config/settings.json'
GROUP_OVERRIDES_FILE = 'config/group_overrides.json'
2025-06-27 17:36:03 +02:00
2025-06-27 18:00:43 +02:00
def log_message(message, level="INFO"):
"""Logs messages to file and prints them."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted_message = f"[{timestamp}] {level}: {message}"
try:
with open(LOG_FILE, 'a', encoding='utf-8') as f:
f.write(formatted_message + "\n")
except Exception as e:
print(f"ERROR: Could not write to log: {e}")
print(formatted_message)
2025-06-27 17:36:03 +02:00
def load_settings():
2025-06-27 18:00:43 +02:00
"""Load settings with defaults."""
2025-06-27 17:36:03 +02:00
default_settings = {
"remove_duplicates": True,
"sort_channels": True,
"backup_before_import": True,
"auto_cleanup_import": True
}
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r', encoding='utf-8') as f:
settings = json.load(f)
return {**default_settings, **settings}
except Exception as e:
2025-06-27 18:00:43 +02:00
log_message(f"Could not load settings, using defaults: {e}", "WARNING")
2025-06-27 17:36:03 +02:00
return default_settings
def load_group_overrides():
2025-06-27 18:00:43 +02:00
"""Load group overrides."""
2025-06-27 17:36:03 +02:00
if os.path.exists(GROUP_OVERRIDES_FILE):
try:
with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
2025-06-27 18:00:43 +02:00
log_message(f"Could not load group overrides: {e}", "WARNING")
2025-06-27 17:36:03 +02:00
return {}
2025-06-27 16:34:52 +02:00
def parse_channel_block(block):
2025-06-27 18:00:43 +02:00
"""Parse a channel block from channels.txt."""
2025-06-27 16:34:52 +02:00
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
key = key.strip()
value = value.strip()
channel_data[key] = value
return channel_data
def parse_m3u_entry(extinf_line, url_line):
2025-06-27 18:00:43 +02:00
"""Parse M3U entry."""
2025-06-27 16:34:52 +02:00
channel = {}
2025-06-27 18:00:43 +02:00
# Extract attributes
2025-06-27 16:34:52 +02:00
tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line)
tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line)
group_title_match = re.search(r'group-title="([^"]*)"', extinf_line)
tvg_name_match = re.search(r'tvg-name="([^"]*)"', extinf_line)
channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else ''
channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else ''
channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized'
channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else ''
2025-06-27 18:00:43 +02:00
# Stream name after the last comma
2025-06-27 16:34:52 +02:00
stream_name_match = re.search(r',(.+)$', extinf_line)
channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel'
channel['Stream URL'] = url_line.strip()
return channel
2025-06-27 17:36:03 +02:00
def apply_group_overrides(channel, group_overrides):
2025-06-27 18:00:43 +02:00
"""Apply group overrides."""
2025-06-27 17:36:03 +02:00
stream_name = channel.get('Stream name', '').lower()
for key, new_group in group_overrides.items():
if key.lower() in stream_name:
channel['Group'] = new_group
break
return channel
2025-06-27 18:00:43 +02:00
def convert_to_channels_txt_block(channel_data):
"""Convert to channels.txt format."""
2025-06-27 16:34:52 +02:00
block = []
2025-06-27 18:00:43 +02:00
block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}")
block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}")
block.append(f"Logo = {channel_data.get('Logo', '')}")
block.append(f"EPG id = {channel_data.get('EPG id', '')}")
block.append(f"Stream URL = {channel_data.get('Stream URL', '')}")
2025-06-27 16:34:52 +02:00
return "\n".join(block)
def get_channel_signature(channel):
2025-06-27 18:00:43 +02:00
"""Create unique signature for duplicate detection."""
2025-06-27 16:34:52 +02:00
stream_name = channel.get('Stream name', '').strip().lower()
stream_url = channel.get('Stream URL', '').strip().lower()
2025-06-27 18:00:43 +02:00
# Clean name
2025-06-27 17:36:03 +02:00
stream_name_clean = re.sub(r'\s+', ' ', stream_name)
stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean)
2025-06-27 16:34:52 +02:00
return f"{stream_name_clean}|{stream_url}"
2025-06-27 18:00:43 +02:00
def remove_duplicates(channels, settings):
"""Remove duplicate channels."""
2025-06-27 17:36:03 +02:00
if not settings.get('remove_duplicates', True):
2025-06-27 18:00:43 +02:00
log_message("Duplicate removal disabled", "INFO")
2025-06-27 17:36:03 +02:00
return channels
2025-06-27 16:34:52 +02:00
seen_signatures = set()
unique_channels = []
duplicate_count = 0
for channel in channels:
signature = get_channel_signature(channel)
if signature not in seen_signatures:
seen_signatures.add(signature)
unique_channels.append(channel)
else:
duplicate_count += 1
if duplicate_count > 0:
2025-06-27 18:00:43 +02:00
log_message(f"Removed {duplicate_count} duplicate channels", "INFO")
2025-06-27 16:34:52 +02:00
else:
2025-06-27 18:00:43 +02:00
log_message("No duplicates found", "INFO")
2025-06-27 16:34:52 +02:00
return unique_channels
2025-06-27 18:00:43 +02:00
def process_import():
"""Process bulk import file."""
2025-06-27 17:36:03 +02:00
settings = load_settings()
group_overrides = load_group_overrides()
2025-06-27 16:34:52 +02:00
if not os.path.exists(IMPORT_FILE):
2025-06-27 18:00:43 +02:00
log_message(f"No {IMPORT_FILE} found, skipping import", "INFO")
2025-06-27 16:34:52 +02:00
return []
2025-06-27 18:00:43 +02:00
log_message(f"Processing {IMPORT_FILE}...", "INFO")
2025-06-27 16:34:52 +02:00
imported_channels = []
try:
with open(IMPORT_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines()
2025-06-27 18:00:43 +02:00
log_message(f"Found {len(lines)} lines in import file", "INFO")
2025-06-27 16:34:52 +02:00
i = 0
while i < len(lines):
line = lines[i].strip()
if line.startswith('#EXTINF:'):
if i + 1 < len(lines):
extinf_line = line
url_line = lines[i+1].strip()
if not url_line or url_line.startswith('#'):
i += 1
continue
channel_data = parse_m3u_entry(extinf_line, url_line)
2025-06-27 17:36:03 +02:00
channel_data = apply_group_overrides(channel_data, group_overrides)
2025-06-27 16:34:52 +02:00
if channel_data.get('Stream name') and channel_data.get('Stream URL'):
imported_channels.append(channel_data)
i += 2
else:
i += 1
else:
i += 1
2025-06-27 18:00:43 +02:00
log_message(f"Parsed {len(imported_channels)} channels from import", "INFO")
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Remove duplicates from import
2025-06-27 16:34:52 +02:00
if imported_channels:
2025-06-27 18:00:43 +02:00
imported_channels = remove_duplicates(imported_channels, settings)
# Check existing channels
2025-06-27 16:34:52 +02:00
existing_channels = []
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
channel_blocks = re.split(r'\n\s*\n+', content.strip())
for block in channel_blocks:
if block.strip():
existing_channels.append(parse_channel_block(block))
existing_signatures = {get_channel_signature(ch) for ch in existing_channels}
new_channels = []
for channel in imported_channels:
if get_channel_signature(channel) not in existing_signatures:
new_channels.append(channel)
imported_channels = new_channels
2025-06-27 18:00:43 +02:00
log_message(f"Final import: {len(imported_channels)} new channels", "INFO")
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Write to channels.txt
2025-06-27 16:34:52 +02:00
if imported_channels:
2025-06-27 17:36:03 +02:00
lines_before = 0
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
lines_before = len(f.readlines())
2025-06-27 18:00:43 +02:00
with open(CHANNELS_FILE, 'a', encoding='utf-8') as f:
for i, channel in enumerate(imported_channels):
if i > 0 or lines_before > 0:
f.write("\n\n")
block_content = convert_to_channels_txt_block(channel)
f.write(block_content)
log_message(f"Successfully imported {len(imported_channels)} channels", "INFO")
2025-06-27 16:34:52 +02:00
else:
2025-06-27 18:00:43 +02:00
log_message("No new channels to import", "INFO")
2025-06-27 16:34:52 +02:00
except Exception as e:
2025-06-27 18:00:43 +02:00
log_message(f"Error processing import: {e}", "ERROR")
2025-06-27 16:34:52 +02:00
return imported_channels
2025-06-27 18:00:43 +02:00
# Clean up import file
2025-06-27 17:36:03 +02:00
if settings.get('auto_cleanup_import', True):
2025-06-27 18:00:43 +02:00
try:
os.remove(IMPORT_FILE)
log_message(f"Cleaned up {IMPORT_FILE}", "INFO")
except Exception as e:
log_message(f"Could not remove {IMPORT_FILE}: {e}", "WARNING")
2025-06-27 17:36:03 +02:00
return imported_channels
2025-06-27 16:34:52 +02:00
def generate_playlist():
2025-06-27 18:00:43 +02:00
"""Main function."""
# Clear log
if os.path.exists(LOG_FILE):
open(LOG_FILE, 'w').close()
2025-06-27 17:36:03 +02:00
2025-06-27 18:00:43 +02:00
log_message("Starting playlist generation...", "INFO")
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
settings = load_settings()
group_overrides = load_group_overrides()
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Process import
imported_channels = process_import()
log_message(f"Import returned {len(imported_channels)} channels", "INFO")
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Read channels.txt
2025-06-27 16:34:52 +02:00
if not os.path.exists(CHANNELS_FILE):
2025-06-27 18:00:43 +02:00
log_message(f"Error: {CHANNELS_FILE} not found", "ERROR")
2025-06-27 16:34:52 +02:00
return
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read()
2025-06-27 18:00:43 +02:00
# Parse channels
2025-06-27 16:34:52 +02:00
channel_blocks = re.split(r'\n\s*\n+', content.strip())
parsed_channels = []
2025-06-27 18:00:43 +02:00
for block in channel_blocks:
2025-06-27 16:34:52 +02:00
if block.strip():
channel = parse_channel_block(block)
if channel:
2025-06-27 17:36:03 +02:00
channel = apply_group_overrides(channel, group_overrides)
2025-06-27 16:34:52 +02:00
parsed_channels.append(channel)
2025-06-27 18:00:43 +02:00
log_message(f"Parsed {len(parsed_channels)} channels", "INFO")
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Remove duplicates
parsed_channels = remove_duplicates(parsed_channels, settings)
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Sort channels
2025-06-27 17:36:03 +02:00
if settings.get('sort_channels', True):
parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
# Build M3U
m3u_lines = ["#EXTM3U"]
2025-06-27 16:34:52 +02:00
valid_channels = 0
2025-06-27 17:36:03 +02:00
2025-06-27 16:34:52 +02:00
for channel in parsed_channels:
stream_name = channel.get('Stream name', '')
group_name = channel.get('Group', 'Uncategorized')
logo_url = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
stream_url = channel.get('Stream URL', '')
if not stream_name or not stream_url:
continue
extinf_attrs = [
f'tvg-id="{epg_id}"',
f'tvg-logo="{logo_url}"',
f'group-title="{group_name}"',
f'tvg-name="{stream_name}"'
]
extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}"
2025-06-27 18:00:43 +02:00
m3u_lines.append(extinf_line)
m3u_lines.append(stream_url)
2025-06-27 16:34:52 +02:00
valid_channels += 1
2025-06-27 18:00:43 +02:00
# Write M3U
2025-06-27 16:34:52 +02:00
try:
with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f:
2025-06-27 18:00:43 +02:00
for line in m3u_lines:
2025-06-27 16:34:52 +02:00
f.write(line + '\n')
2025-06-27 18:00:43 +02:00
log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO")
2025-06-27 16:34:52 +02:00
except Exception as e:
2025-06-27 18:00:43 +02:00
log_message(f"Error writing playlist: {e}", "ERROR")
2025-06-27 16:34:52 +02:00
2025-06-27 18:00:43 +02:00
log_message("Playlist generation complete", "INFO")
2025-06-27 16:34:52 +02:00
if __name__ == "__main__":
generate_playlist()