Update scripts/generate_playlist.py
Some checks failed
Generate M3U Playlist / build (push) Has been cancelled

This commit is contained in:
stoney420 2025-06-27 17:36:03 +02:00
parent 0721a8e9d2
commit efba1c1333

View file

@ -1,19 +1,74 @@
import re import re
import os import os
import json
from datetime import datetime
# --- Configuration --- # --- Configuration ---
CHANNELS_FILE = 'channels.txt' CHANNELS_FILE = 'channels.txt'
PLAYLIST_FILE = 'playlist.m3u' PLAYLIST_FILE = 'playlist.m3u'
IMPORT_FILE = 'bulk_import.m3u' IMPORT_FILE = 'bulk_import.m3u'
LOG_FILE = 'playlist_update.log' LOG_DIR = 'logs'
CONFIG_DIR = 'config'
# Log files
MAIN_LOG = os.path.join(LOG_DIR, 'playlist_update.log')
IMPORT_LOG = os.path.join(LOG_DIR, 'import_history.log')
ERROR_LOG = os.path.join(LOG_DIR, 'error.log')
# Config files
SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json')
GROUP_OVERRIDES_FILE = os.path.join(CONFIG_DIR, 'group_overrides.json')
# --- Helper Functions --- # --- Helper Functions ---
def log_message(message, level="INFO"): def ensure_directories():
"""Logs messages to a file and prints them.""" """Create necessary directories if they don't exist."""
with open(LOG_FILE, 'a') as f: os.makedirs(LOG_DIR, exist_ok=True)
f.write(f"{level}: {message}\n") os.makedirs(CONFIG_DIR, exist_ok=True)
print(f"{level}: {message}")
def load_settings():
"""Load settings from config file with defaults."""
default_settings = {
"remove_duplicates": True,
"sort_channels": True,
"validate_urls": False,
"backup_before_import": True,
"auto_cleanup_import": True
}
if os.path.exists(SETTINGS_FILE):
try:
with open(SETTINGS_FILE, 'r', encoding='utf-8') as f:
settings = json.load(f)
return {**default_settings, **settings}
except Exception as e:
log_message(f"Error loading settings, using defaults: {e}", "WARNING", ERROR_LOG)
return default_settings
def load_group_overrides():
"""Load group name overrides from config file."""
if os.path.exists(GROUP_OVERRIDES_FILE):
try:
with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
log_message(f"Error loading group overrides: {e}", "WARNING", ERROR_LOG)
return {}
def log_message(message, level="INFO", log_file=MAIN_LOG):
"""Logs messages to specified file and prints them."""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
formatted_message = f"[{timestamp}] {level}: {message}"
try:
with open(log_file, 'a', encoding='utf-8') as f:
f.write(formatted_message + "\n")
except Exception as e:
print(f"ERROR: Could not write to log file {log_file}: {e}")
print(formatted_message)
def parse_channel_block(block): def parse_channel_block(block):
"""Parses a single channel block from channels.txt.""" """Parses a single channel block from channels.txt."""
@ -32,7 +87,6 @@ def parse_channel_block(block):
def parse_m3u_entry(extinf_line, url_line): def parse_m3u_entry(extinf_line, url_line):
"""Parses an M3U #EXTINF and URL line into a dictionary.""" """Parses an M3U #EXTINF and URL line into a dictionary."""
channel = {} channel = {}
# Extract attributes using regex
tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line)
tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line)
group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) group_title_match = re.search(r'group-title="([^"]*)"', extinf_line)
@ -43,13 +97,25 @@ def parse_m3u_entry(extinf_line, url_line):
channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized'
channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else '' channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else ''
# Stream name is after the last comma
stream_name_match = re.search(r',(.+)$', extinf_line) stream_name_match = re.search(r',(.+)$', extinf_line)
channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel' channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel'
channel['Stream URL'] = url_line.strip() channel['Stream URL'] = url_line.strip()
return channel return channel
def apply_group_overrides(channel, group_overrides):
"""Apply group name overrides to channel."""
stream_name = channel.get('Stream name', '').lower()
current_group = channel.get('Group', 'Uncategorized')
for key, new_group in group_overrides.items():
if key.lower() in stream_name:
log_message(f"Override: '{channel.get('Stream name')}' moved from '{current_group}' to '{new_group}'", "DEBUG")
channel['Group'] = new_group
break
return channel
def convert_m3u_to_channels_txt_block(m3u_channel_data): def convert_m3u_to_channels_txt_block(m3u_channel_data):
"""Converts a parsed M3U channel entry to channels.txt block format.""" """Converts a parsed M3U channel entry to channels.txt block format."""
block = [] block = []
@ -62,18 +128,20 @@ def convert_m3u_to_channels_txt_block(m3u_channel_data):
def get_channel_signature(channel): def get_channel_signature(channel):
"""Creates a unique signature for a channel to detect duplicates.""" """Creates a unique signature for a channel to detect duplicates."""
# Use stream name and URL as the primary identifier
stream_name = channel.get('Stream name', '').strip().lower() stream_name = channel.get('Stream name', '').strip().lower()
stream_url = channel.get('Stream URL', '').strip().lower() stream_url = channel.get('Stream URL', '').strip().lower()
# Clean up stream name for comparison (remove common variations) stream_name_clean = re.sub(r'\s+', ' ', stream_name)
stream_name_clean = re.sub(r'\s+', ' ', stream_name) # Normalize whitespace stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean)
stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) # Remove special chars
return f"{stream_name_clean}|{stream_url}" return f"{stream_name_clean}|{stream_url}"
def remove_duplicates_from_channels(channels): def remove_duplicates_from_channels(channels, settings):
"""Removes duplicate channels based on stream name and URL.""" """Removes duplicate channels based on stream name and URL."""
if not settings.get('remove_duplicates', True):
log_message("Duplicate removal disabled in settings", "INFO")
return channels
seen_signatures = set() seen_signatures = set()
unique_channels = [] unique_channels = []
duplicate_count = 0 duplicate_count = 0
@ -97,22 +165,51 @@ def remove_duplicates_from_channels(channels):
return unique_channels return unique_channels
def backup_channels_file():
"""Create a backup of channels.txt before import."""
if os.path.exists(CHANNELS_FILE):
backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
try:
import shutil
shutil.copy2(CHANNELS_FILE, backup_name)
log_message(f"Created backup: {backup_name}", "INFO")
return backup_name
except Exception as e:
log_message(f"Failed to create backup: {e}", "WARNING", ERROR_LOG)
return None
def log_import_statistics(imported_count, duplicate_count, existing_count):
"""Log import statistics to import history."""
stats = {
"timestamp": datetime.now().isoformat(),
"imported": imported_count,
"duplicates_removed": duplicate_count,
"already_existed": existing_count,
"total_processed": imported_count + duplicate_count + existing_count
}
log_message(f"Import completed: {json.dumps(stats)}", "INFO", IMPORT_LOG)
def process_import_m3u(): def process_import_m3u():
"""Processes bulk_import.m3u to add channels to channels.txt.""" """Processes bulk_import.m3u to add channels to channels.txt."""
settings = load_settings()
group_overrides = load_group_overrides()
if not os.path.exists(IMPORT_FILE): if not os.path.exists(IMPORT_FILE):
log_message(f"No {IMPORT_FILE} found, skipping import.", "INFO") log_message(f"No {IMPORT_FILE} found, skipping import.", "INFO")
return [] return []
log_message(f"Processing {IMPORT_FILE} for bulk import...", "INFO") log_message(f"Processing {IMPORT_FILE} for bulk import...", "INFO")
# Check file size first if settings.get('backup_before_import', True):
backup_channels_file()
file_size = os.path.getsize(IMPORT_FILE) file_size = os.path.getsize(IMPORT_FILE)
log_message(f"{IMPORT_FILE} file size: {file_size} bytes", "DEBUG") log_message(f"{IMPORT_FILE} file size: {file_size} bytes", "DEBUG")
imported_channels = [] imported_channels = []
try: try:
# Read the import file
log_message(f"Reading {IMPORT_FILE}...", "DEBUG") log_message(f"Reading {IMPORT_FILE}...", "DEBUG")
with open(IMPORT_FILE, 'r', encoding='utf-8') as f: with open(IMPORT_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines() lines = f.readlines()
@ -128,18 +225,17 @@ def process_import_m3u():
extinf_line = line extinf_line = line
url_line = lines[i+1].strip() url_line = lines[i+1].strip()
# Skip empty URLs
if not url_line or url_line.startswith('#'): if not url_line or url_line.startswith('#'):
i += 1 i += 1
continue continue
channel_data = parse_m3u_entry(extinf_line, url_line) channel_data = parse_m3u_entry(extinf_line, url_line)
channel_data = apply_group_overrides(channel_data, group_overrides)
# Only require Stream name and Stream URL
if channel_data.get('Stream name') and channel_data.get('Stream URL'): if channel_data.get('Stream name') and channel_data.get('Stream URL'):
imported_channels.append(channel_data) imported_channels.append(channel_data)
parsed_count += 1 parsed_count += 1
if parsed_count <= 3: # Show first 3 for debugging if parsed_count <= 3:
log_message(f"Sample channel {parsed_count}: {channel_data.get('Stream name')}", "DEBUG") log_message(f"Sample channel {parsed_count}: {channel_data.get('Stream name')}", "DEBUG")
else: else:
log_message(f"Skipping channel - missing required fields: {channel_data.get('Stream name', 'No name')}", "WARNING") log_message(f"Skipping channel - missing required fields: {channel_data.get('Stream name', 'No name')}", "WARNING")
@ -152,14 +248,16 @@ def process_import_m3u():
log_message(f"Parsed {parsed_count} valid channels from {IMPORT_FILE}", "INFO") log_message(f"Parsed {parsed_count} valid channels from {IMPORT_FILE}", "INFO")
# **NEW: Remove duplicates from imported channels before adding to channels.txt** if imported_channels and settings.get('remove_duplicates', True):
if imported_channels:
log_message("Checking imported channels for duplicates...", "INFO") log_message("Checking imported channels for duplicates...", "INFO")
imported_channels = remove_duplicates_from_channels(imported_channels) original_import_count = len(imported_channels)
imported_channels = remove_duplicates_from_channels(imported_channels, settings)
import_duplicate_count = original_import_count - len(imported_channels)
log_message(f"After duplicate removal: {len(imported_channels)} unique channels to import", "INFO") log_message(f"After duplicate removal: {len(imported_channels)} unique channels to import", "INFO")
else:
import_duplicate_count = 0
if imported_channels: if imported_channels:
# Check if channels already exist in channels.txt to prevent re-importing
existing_channels = [] existing_channels = []
if os.path.exists(CHANNELS_FILE): if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
@ -171,7 +269,6 @@ def process_import_m3u():
log_message(f"Found {len(existing_channels)} existing channels in {CHANNELS_FILE}", "DEBUG") log_message(f"Found {len(existing_channels)} existing channels in {CHANNELS_FILE}", "DEBUG")
# Check for duplicates between existing and imported
existing_signatures = {get_channel_signature(ch) for ch in existing_channels} existing_signatures = {get_channel_signature(ch) for ch in existing_channels}
new_channels = [] new_channels = []
already_exists_count = 0 already_exists_count = 0
@ -187,134 +284,120 @@ def process_import_m3u():
imported_channels = new_channels imported_channels = new_channels
log_message(f"Final import count: {len(imported_channels)} new unique channels", "INFO") log_message(f"Final import count: {len(imported_channels)} new unique channels", "INFO")
# Check channels.txt before writing
lines_before = 0
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
lines_before = len(f.readlines())
log_message(f"{CHANNELS_FILE} has {lines_before} lines before import", "DEBUG")
else: else:
log_message(f"{CHANNELS_FILE} does not exist, will create it", "DEBUG") already_exists_count = 0
# Append to channels.txt
if imported_channels: if imported_channels:
lines_before = 0
if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
lines_before = len(f.readlines())
log_message(f"{CHANNELS_FILE} has {lines_before} lines before import", "DEBUG")
log_message(f"Attempting to append {len(imported_channels)} channels to {CHANNELS_FILE}...", "DEBUG") log_message(f"Attempting to append {len(imported_channels)} channels to {CHANNELS_FILE}...", "DEBUG")
try: try:
with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: with open(CHANNELS_FILE, 'a', encoding='utf-8') as f:
for i, channel in enumerate(imported_channels): for i, channel in enumerate(imported_channels):
# Add separators between channels
if i > 0 or lines_before > 0: if i > 0 or lines_before > 0:
f.write("\n\n") f.write("\n\n")
block_content = convert_m3u_to_channels_txt_block(channel) block_content = convert_m3u_to_channels_txt_block(channel)
f.write(block_content) f.write(block_content)
if i < 3: # Log first 3 for debugging if i < 3:
log_message(f"Wrote channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") log_message(f"Wrote channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG")
log_message(f"Successfully appended {len(imported_channels)} unique channels to {CHANNELS_FILE}.", "INFO") log_message(f"Successfully appended {len(imported_channels)} unique channels to {CHANNELS_FILE}.", "INFO")
log_import_statistics(len(imported_channels), import_duplicate_count, already_exists_count)
# Verify the write worked
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
lines_after = len(f.readlines())
log_message(f"Verification: {CHANNELS_FILE} now has {lines_after} lines (was {lines_before})", "INFO")
except Exception as write_error: except Exception as write_error:
log_message(f"ERROR writing to {CHANNELS_FILE}: {write_error}", "ERROR") log_message(f"ERROR writing to {CHANNELS_FILE}: {write_error}", "ERROR", ERROR_LOG)
return imported_channels # Return anyway for M3U generation return imported_channels
else: else:
log_message("No new unique channels to import after duplicate checking.", "INFO") log_message("No new unique channels to import after duplicate checking.", "INFO")
log_import_statistics(0, import_duplicate_count, already_exists_count)
else: else:
log_message(f"No valid channels found in {IMPORT_FILE}.", "INFO") log_message(f"No valid channels found in {IMPORT_FILE}.", "INFO")
except Exception as e: except Exception as e:
log_message(f"Error processing {IMPORT_FILE}: {e}", "ERROR") log_message(f"Error processing {IMPORT_FILE}: {e}", "ERROR", ERROR_LOG)
return imported_channels return imported_channels
# Clean up the import file if settings.get('auto_cleanup_import', True):
cleanup_import_file()
return imported_channels
def cleanup_import_file():
"""Clean up the import file after processing."""
log_message(f"Attempting to clean up {IMPORT_FILE}...", "DEBUG") log_message(f"Attempting to clean up {IMPORT_FILE}...", "DEBUG")
try: try:
# First try to delete
os.remove(IMPORT_FILE) os.remove(IMPORT_FILE)
log_message(f"Successfully deleted {IMPORT_FILE} after processing.", "INFO") log_message(f"Successfully deleted {IMPORT_FILE} after processing.", "INFO")
except PermissionError as pe: except PermissionError as pe:
log_message(f"Permission denied deleting {IMPORT_FILE}: {pe}", "WARNING") log_message(f"Permission denied deleting {IMPORT_FILE}: {pe}", "WARNING")
# Try to clear instead
try:
with open(IMPORT_FILE, 'w', encoding='utf-8') as f:
f.write('') # Clear the file content
log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO")
except Exception as clear_error:
log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR")
except Exception as e:
log_message(f"Unexpected error deleting {IMPORT_FILE}: {e}", "WARNING")
# Try to clear instead
try: try:
with open(IMPORT_FILE, 'w', encoding='utf-8') as f: with open(IMPORT_FILE, 'w', encoding='utf-8') as f:
f.write('') f.write('')
log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO") log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO")
except Exception as clear_error: except Exception as clear_error:
log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR") log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR", ERROR_LOG)
except Exception as e:
return imported_channels log_message(f"Unexpected error deleting {IMPORT_FILE}: {e}", "WARNING")
def generate_playlist(): def generate_playlist():
"""Main function to generate the M3U playlist.""" """Main function to generate the M3U playlist."""
# Clear previous log content ensure_directories()
if os.path.exists(LOG_FILE): settings = load_settings()
open(LOG_FILE, 'w').close() group_overrides = load_group_overrides()
if os.path.exists(MAIN_LOG):
open(MAIN_LOG, 'w').close()
log_message("Starting M3U playlist generation...", "INFO") log_message("Starting M3U playlist generation...", "INFO")
log_message(f"Settings: {json.dumps(settings, indent=2)}", "DEBUG")
# Process import file first and get the imported channels
imported_channels = process_import_m3u() imported_channels = process_import_m3u()
log_message(f"Import process returned {len(imported_channels)} channels", "DEBUG") log_message(f"Import process returned {len(imported_channels)} channels", "DEBUG")
# Now read from channels.txt (which should include the imported channels)
if not os.path.exists(CHANNELS_FILE): if not os.path.exists(CHANNELS_FILE):
log_message(f"Error: {CHANNELS_FILE} not found.", "ERROR") log_message(f"Error: {CHANNELS_FILE} not found.", "ERROR", ERROR_LOG)
return return
# Read channels.txt
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
content = f.read() content = f.read()
log_message(f"Read {len(content)} characters from {CHANNELS_FILE}", "DEBUG") log_message(f"Read {len(content)} characters from {CHANNELS_FILE}", "DEBUG")
# Split content into blocks by two or more newlines
channel_blocks = re.split(r'\n\s*\n+', content.strip()) channel_blocks = re.split(r'\n\s*\n+', content.strip())
log_message(f"Found {len(channel_blocks)} channel blocks in {CHANNELS_FILE}", "DEBUG") log_message(f"Found {len(channel_blocks)} channel blocks in {CHANNELS_FILE}", "DEBUG")
parsed_channels = [] parsed_channels = []
for i, block in enumerate(channel_blocks): for i, block in enumerate(channel_blocks):
if block.strip(): if block.strip():
channel = parse_channel_block(block) channel = parse_channel_block(block)
if channel: if channel:
channel = apply_group_overrides(channel, group_overrides)
parsed_channels.append(channel) parsed_channels.append(channel)
if i < 5: # Log first 5 for debugging if i < 5:
log_message(f"Parsed channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") log_message(f"Parsed channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG")
log_message(f"Successfully parsed {len(parsed_channels)} channels from {CHANNELS_FILE}", "INFO") log_message(f"Successfully parsed {len(parsed_channels)} channels from {CHANNELS_FILE}", "INFO")
# **NEW: Remove duplicates from all channels before generating M3U**
log_message("Checking all channels for duplicates before generating M3U...", "INFO")
original_count = len(parsed_channels) original_count = len(parsed_channels)
parsed_channels = remove_duplicates_from_channels(parsed_channels) parsed_channels = remove_duplicates_from_channels(parsed_channels, settings)
final_count = len(parsed_channels) final_count = len(parsed_channels)
if original_count != final_count: if original_count != final_count:
log_message(f"Final M3U will have {final_count} unique channels (removed {original_count - final_count} duplicates)", "INFO") log_message(f"Final M3U will have {final_count} unique channels (removed {original_count - final_count} duplicates)", "INFO")
# Start building M3U if settings.get('sort_channels', True):
parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
log_message("Channels sorted by group and name", "DEBUG")
new_m3u_lines = ["#EXTM3U"] new_m3u_lines = ["#EXTM3U"]
# Sort channels by Group then Stream name
parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
valid_channels = 0 valid_channels = 0
for channel in parsed_channels: for channel in parsed_channels:
stream_name = channel.get('Stream name', '') stream_name = channel.get('Stream name', '')
group_name = channel.get('Group', 'Uncategorized') group_name = channel.get('Group', 'Uncategorized')
@ -322,7 +405,6 @@ def generate_playlist():
epg_id = channel.get('EPG id', '') epg_id = channel.get('EPG id', '')
stream_url = channel.get('Stream URL', '') stream_url = channel.get('Stream URL', '')
# Only require Stream name and Stream URL
if not stream_name or not stream_url: if not stream_name or not stream_url:
log_message(f"Skipping channel due to missing required field: {stream_name or 'Unknown'}", "WARNING") log_message(f"Skipping channel due to missing required field: {stream_name or 'Unknown'}", "WARNING")
continue continue
@ -339,14 +421,21 @@ def generate_playlist():
new_m3u_lines.append(stream_url) new_m3u_lines.append(stream_url)
valid_channels += 1 valid_channels += 1
# Write the new M3U file
try: try:
with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f:
for line in new_m3u_lines: for line in new_m3u_lines:
f.write(line + '\n') f.write(line + '\n')
log_message(f"Successfully generated {PLAYLIST_FILE} with {valid_channels} unique channels.", "INFO") log_message(f"Successfully generated {PLAYLIST_FILE} with {valid_channels} unique channels.", "INFO")
stats = {
"total_channels": valid_channels,
"groups": len(set(ch.get('Group', 'Uncategorized') for ch in parsed_channels)),
"generation_time": datetime.now().isoformat()
}
log_message(f"Generation stats: {json.dumps(stats)}", "INFO")
except Exception as e: except Exception as e:
log_message(f"Error writing {PLAYLIST_FILE}: {e}", "ERROR") log_message(f"Error writing {PLAYLIST_FILE}: {e}", "ERROR", ERROR_LOG)
log_message("M3U playlist generation complete.", "INFO") log_message("M3U playlist generation complete.", "INFO")