From d1d15772f24bd475223a462ffea5ca112dfb3f4e Mon Sep 17 00:00:00 2001 From: stoney420 Date: Fri, 27 Jun 2025 18:00:43 +0200 Subject: [PATCH] Update scripts/generate_playlist.py --- scripts/generate_playlist.py | 303 +++++++++++------------------------ 1 file changed, 98 insertions(+), 205 deletions(-) diff --git a/scripts/generate_playlist.py b/scripts/generate_playlist.py index af59b45..12eb8c7 100644 --- a/scripts/generate_playlist.py +++ b/scripts/generate_playlist.py @@ -3,35 +3,34 @@ import os import json from datetime import datetime -# --- Configuration --- +# --- Simple Configuration --- CHANNELS_FILE = 'channels.txt' PLAYLIST_FILE = 'playlist.m3u' IMPORT_FILE = 'bulk_import.m3u' -LOG_DIR = 'logs' -CONFIG_DIR = 'config' +LOG_FILE = 'playlist_update.log' -# Log files -MAIN_LOG = os.path.join(LOG_DIR, 'playlist_update.log') -IMPORT_LOG = os.path.join(LOG_DIR, 'import_history.log') -ERROR_LOG = os.path.join(LOG_DIR, 'error.log') +# Config files (optional) +SETTINGS_FILE = 'config/settings.json' +GROUP_OVERRIDES_FILE = 'config/group_overrides.json' -# Config files -SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json') -GROUP_OVERRIDES_FILE = os.path.join(CONFIG_DIR, 'group_overrides.json') - -# --- Helper Functions --- - -def ensure_directories(): - """Create necessary directories if they don't exist.""" - os.makedirs(LOG_DIR, exist_ok=True) - os.makedirs(CONFIG_DIR, exist_ok=True) +def log_message(message, level="INFO"): + """Logs messages to file and prints them.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + formatted_message = f"[{timestamp}] {level}: {message}" + + try: + with open(LOG_FILE, 'a', encoding='utf-8') as f: + f.write(formatted_message + "\n") + except Exception as e: + print(f"ERROR: Could not write to log: {e}") + + print(formatted_message) def load_settings(): - """Load settings from config file with defaults.""" + """Load settings with defaults.""" default_settings = { "remove_duplicates": True, "sort_channels": True, - "validate_urls": False, "backup_before_import": True, "auto_cleanup_import": True } @@ -42,36 +41,23 @@ def load_settings(): settings = json.load(f) return {**default_settings, **settings} except Exception as e: - log_message(f"Error loading settings, using defaults: {e}", "WARNING", ERROR_LOG) + log_message(f"Could not load settings, using defaults: {e}", "WARNING") return default_settings def load_group_overrides(): - """Load group name overrides from config file.""" + """Load group overrides.""" if os.path.exists(GROUP_OVERRIDES_FILE): try: with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f: return json.load(f) except Exception as e: - log_message(f"Error loading group overrides: {e}", "WARNING", ERROR_LOG) + log_message(f"Could not load group overrides: {e}", "WARNING") return {} -def log_message(message, level="INFO", log_file=MAIN_LOG): - """Logs messages to specified file and prints them.""" - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - formatted_message = f"[{timestamp}] {level}: {message}" - - try: - with open(log_file, 'a', encoding='utf-8') as f: - f.write(formatted_message + "\n") - except Exception as e: - print(f"ERROR: Could not write to log file {log_file}: {e}") - - print(formatted_message) - def parse_channel_block(block): - """Parses a single channel block from channels.txt.""" + """Parse a channel block from channels.txt.""" channel_data = {} lines = block.strip().split('\n') @@ -85,8 +71,10 @@ def parse_channel_block(block): return channel_data def parse_m3u_entry(extinf_line, url_line): - """Parses an M3U #EXTINF and URL line into a dictionary.""" + """Parse M3U entry.""" channel = {} + + # Extract attributes tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) @@ -97,6 +85,7 @@ def parse_m3u_entry(extinf_line, url_line): channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else '' + # Stream name after the last comma stream_name_match = re.search(r',(.+)$', extinf_line) channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel' channel['Stream URL'] = url_line.strip() @@ -104,50 +93,47 @@ def parse_m3u_entry(extinf_line, url_line): return channel def apply_group_overrides(channel, group_overrides): - """Apply group name overrides to channel.""" + """Apply group overrides.""" stream_name = channel.get('Stream name', '').lower() - current_group = channel.get('Group', 'Uncategorized') for key, new_group in group_overrides.items(): if key.lower() in stream_name: - log_message(f"Override: '{channel.get('Stream name')}' moved from '{current_group}' to '{new_group}'", "DEBUG") channel['Group'] = new_group break return channel -def convert_m3u_to_channels_txt_block(m3u_channel_data): - """Converts a parsed M3U channel entry to channels.txt block format.""" +def convert_to_channels_txt_block(channel_data): + """Convert to channels.txt format.""" block = [] - block.append(f"Group = {m3u_channel_data.get('Group', 'Uncategorized')}") - block.append(f"Stream name = {m3u_channel_data.get('Stream name', 'Unknown Channel')}") - block.append(f"Logo = {m3u_channel_data.get('Logo', '')}") - block.append(f"EPG id = {m3u_channel_data.get('EPG id', '')}") - block.append(f"Stream URL = {m3u_channel_data.get('Stream URL', '')}") + block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") + block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") + block.append(f"Logo = {channel_data.get('Logo', '')}") + block.append(f"EPG id = {channel_data.get('EPG id', '')}") + block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") return "\n".join(block) def get_channel_signature(channel): - """Creates a unique signature for a channel to detect duplicates.""" + """Create unique signature for duplicate detection.""" stream_name = channel.get('Stream name', '').strip().lower() stream_url = channel.get('Stream URL', '').strip().lower() + # Clean name stream_name_clean = re.sub(r'\s+', ' ', stream_name) stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) return f"{stream_name_clean}|{stream_url}" -def remove_duplicates_from_channels(channels, settings): - """Removes duplicate channels based on stream name and URL.""" +def remove_duplicates(channels, settings): + """Remove duplicate channels.""" if not settings.get('remove_duplicates', True): - log_message("Duplicate removal disabled in settings", "INFO") + log_message("Duplicate removal disabled", "INFO") return channels seen_signatures = set() unique_channels = [] duplicate_count = 0 - log_message(f"Checking {len(channels)} channels for duplicates...", "DEBUG") - for channel in channels: signature = get_channel_signature(channel) @@ -156,68 +142,34 @@ def remove_duplicates_from_channels(channels, settings): unique_channels.append(channel) else: duplicate_count += 1 - log_message(f"Duplicate found: {channel.get('Stream name', 'Unknown')} - {channel.get('Stream URL', 'No URL')[:50]}...", "DEBUG") if duplicate_count > 0: - log_message(f"Removed {duplicate_count} duplicate channels.", "INFO") + log_message(f"Removed {duplicate_count} duplicate channels", "INFO") else: - log_message("No duplicates found.", "INFO") + log_message("No duplicates found", "INFO") return unique_channels -def backup_channels_file(): - """Create a backup of channels.txt before import.""" - if os.path.exists(CHANNELS_FILE): - backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" - try: - import shutil - shutil.copy2(CHANNELS_FILE, backup_name) - log_message(f"Created backup: {backup_name}", "INFO") - return backup_name - except Exception as e: - log_message(f"Failed to create backup: {e}", "WARNING", ERROR_LOG) - return None - -def log_import_statistics(imported_count, duplicate_count, existing_count): - """Log import statistics to import history.""" - stats = { - "timestamp": datetime.now().isoformat(), - "imported": imported_count, - "duplicates_removed": duplicate_count, - "already_existed": existing_count, - "total_processed": imported_count + duplicate_count + existing_count - } - - log_message(f"Import completed: {json.dumps(stats)}", "INFO", IMPORT_LOG) - -def process_import_m3u(): - """Processes bulk_import.m3u to add channels to channels.txt.""" +def process_import(): + """Process bulk import file.""" settings = load_settings() group_overrides = load_group_overrides() if not os.path.exists(IMPORT_FILE): - log_message(f"No {IMPORT_FILE} found, skipping import.", "INFO") + log_message(f"No {IMPORT_FILE} found, skipping import", "INFO") return [] - log_message(f"Processing {IMPORT_FILE} for bulk import...", "INFO") - - if settings.get('backup_before_import', True): - backup_channels_file() - - file_size = os.path.getsize(IMPORT_FILE) - log_message(f"{IMPORT_FILE} file size: {file_size} bytes", "DEBUG") + log_message(f"Processing {IMPORT_FILE}...", "INFO") imported_channels = [] try: - log_message(f"Reading {IMPORT_FILE}...", "DEBUG") with open(IMPORT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() - log_message(f"Found {len(lines)} lines in {IMPORT_FILE}", "DEBUG") + log_message(f"Found {len(lines)} lines in import file", "INFO") i = 0 - parsed_count = 0 while i < len(lines): line = lines[i].strip() if line.startswith('#EXTINF:'): @@ -234,11 +186,6 @@ def process_import_m3u(): if channel_data.get('Stream name') and channel_data.get('Stream URL'): imported_channels.append(channel_data) - parsed_count += 1 - if parsed_count <= 3: - log_message(f"Sample channel {parsed_count}: {channel_data.get('Stream name')}", "DEBUG") - else: - log_message(f"Skipping channel - missing required fields: {channel_data.get('Stream name', 'No name')}", "WARNING") i += 2 else: @@ -246,18 +193,13 @@ def process_import_m3u(): else: i += 1 - log_message(f"Parsed {parsed_count} valid channels from {IMPORT_FILE}", "INFO") - - if imported_channels and settings.get('remove_duplicates', True): - log_message("Checking imported channels for duplicates...", "INFO") - original_import_count = len(imported_channels) - imported_channels = remove_duplicates_from_channels(imported_channels, settings) - import_duplicate_count = original_import_count - len(imported_channels) - log_message(f"After duplicate removal: {len(imported_channels)} unique channels to import", "INFO") - else: - import_duplicate_count = 0 + log_message(f"Parsed {len(imported_channels)} channels from import", "INFO") + # Remove duplicates from import if imported_channels: + imported_channels = remove_duplicates(imported_channels, settings) + + # Check existing channels existing_channels = [] if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: @@ -267,135 +209,94 @@ def process_import_m3u(): if block.strip(): existing_channels.append(parse_channel_block(block)) - log_message(f"Found {len(existing_channels)} existing channels in {CHANNELS_FILE}", "DEBUG") - existing_signatures = {get_channel_signature(ch) for ch in existing_channels} new_channels = [] - already_exists_count = 0 for channel in imported_channels: if get_channel_signature(channel) not in existing_signatures: new_channels.append(channel) - else: - already_exists_count += 1 - - if already_exists_count > 0: - log_message(f"Skipped {already_exists_count} channels that already exist in {CHANNELS_FILE}", "INFO") imported_channels = new_channels - log_message(f"Final import count: {len(imported_channels)} new unique channels", "INFO") - else: - already_exists_count = 0 + log_message(f"Final import: {len(imported_channels)} new channels", "INFO") + # Write to channels.txt if imported_channels: lines_before = 0 if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: lines_before = len(f.readlines()) - log_message(f"{CHANNELS_FILE} has {lines_before} lines before import", "DEBUG") - log_message(f"Attempting to append {len(imported_channels)} channels to {CHANNELS_FILE}...", "DEBUG") - try: - with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: - for i, channel in enumerate(imported_channels): - if i > 0 or lines_before > 0: - f.write("\n\n") - - block_content = convert_m3u_to_channels_txt_block(channel) - f.write(block_content) - - if i < 3: - log_message(f"Wrote channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") - - log_message(f"Successfully appended {len(imported_channels)} unique channels to {CHANNELS_FILE}.", "INFO") - log_import_statistics(len(imported_channels), import_duplicate_count, already_exists_count) - - except Exception as write_error: - log_message(f"ERROR writing to {CHANNELS_FILE}: {write_error}", "ERROR", ERROR_LOG) - return imported_channels + with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: + for i, channel in enumerate(imported_channels): + if i > 0 or lines_before > 0: + f.write("\n\n") + + block_content = convert_to_channels_txt_block(channel) + f.write(block_content) + + log_message(f"Successfully imported {len(imported_channels)} channels", "INFO") else: - log_message("No new unique channels to import after duplicate checking.", "INFO") - log_import_statistics(0, import_duplicate_count, already_exists_count) - else: - log_message(f"No valid channels found in {IMPORT_FILE}.", "INFO") + log_message("No new channels to import", "INFO") except Exception as e: - log_message(f"Error processing {IMPORT_FILE}: {e}", "ERROR", ERROR_LOG) + log_message(f"Error processing import: {e}", "ERROR") return imported_channels + # Clean up import file if settings.get('auto_cleanup_import', True): - cleanup_import_file() + try: + os.remove(IMPORT_FILE) + log_message(f"Cleaned up {IMPORT_FILE}", "INFO") + except Exception as e: + log_message(f"Could not remove {IMPORT_FILE}: {e}", "WARNING") return imported_channels -def cleanup_import_file(): - """Clean up the import file after processing.""" - log_message(f"Attempting to clean up {IMPORT_FILE}...", "DEBUG") - try: - os.remove(IMPORT_FILE) - log_message(f"Successfully deleted {IMPORT_FILE} after processing.", "INFO") - except PermissionError as pe: - log_message(f"Permission denied deleting {IMPORT_FILE}: {pe}", "WARNING") - try: - with open(IMPORT_FILE, 'w', encoding='utf-8') as f: - f.write('') - log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO") - except Exception as clear_error: - log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR", ERROR_LOG) - except Exception as e: - log_message(f"Unexpected error deleting {IMPORT_FILE}: {e}", "WARNING") - def generate_playlist(): - """Main function to generate the M3U playlist.""" - ensure_directories() + """Main function.""" + # Clear log + if os.path.exists(LOG_FILE): + open(LOG_FILE, 'w').close() + + log_message("Starting playlist generation...", "INFO") + settings = load_settings() group_overrides = load_group_overrides() - - if os.path.exists(MAIN_LOG): - open(MAIN_LOG, 'w').close() - - log_message("Starting M3U playlist generation...", "INFO") - log_message(f"Settings: {json.dumps(settings, indent=2)}", "DEBUG") - imported_channels = process_import_m3u() - log_message(f"Import process returned {len(imported_channels)} channels", "DEBUG") + # Process import + imported_channels = process_import() + log_message(f"Import returned {len(imported_channels)} channels", "INFO") + # Read channels.txt if not os.path.exists(CHANNELS_FILE): - log_message(f"Error: {CHANNELS_FILE} not found.", "ERROR", ERROR_LOG) + log_message(f"Error: {CHANNELS_FILE} not found", "ERROR") return with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() - log_message(f"Read {len(content)} characters from {CHANNELS_FILE}", "DEBUG") - + # Parse channels channel_blocks = re.split(r'\n\s*\n+', content.strip()) - log_message(f"Found {len(channel_blocks)} channel blocks in {CHANNELS_FILE}", "DEBUG") - parsed_channels = [] - for i, block in enumerate(channel_blocks): + + for block in channel_blocks: if block.strip(): channel = parse_channel_block(block) if channel: channel = apply_group_overrides(channel, group_overrides) parsed_channels.append(channel) - if i < 5: - log_message(f"Parsed channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") - log_message(f"Successfully parsed {len(parsed_channels)} channels from {CHANNELS_FILE}", "INFO") + log_message(f"Parsed {len(parsed_channels)} channels", "INFO") - original_count = len(parsed_channels) - parsed_channels = remove_duplicates_from_channels(parsed_channels, settings) - final_count = len(parsed_channels) - - if original_count != final_count: - log_message(f"Final M3U will have {final_count} unique channels (removed {original_count - final_count} duplicates)", "INFO") + # Remove duplicates + parsed_channels = remove_duplicates(parsed_channels, settings) + # Sort channels if settings.get('sort_channels', True): parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) - log_message("Channels sorted by group and name", "DEBUG") - new_m3u_lines = ["#EXTM3U"] + # Build M3U + m3u_lines = ["#EXTM3U"] valid_channels = 0 for channel in parsed_channels: @@ -406,7 +307,6 @@ def generate_playlist(): stream_url = channel.get('Stream URL', '') if not stream_name or not stream_url: - log_message(f"Skipping channel due to missing required field: {stream_name or 'Unknown'}", "WARNING") continue extinf_attrs = [ @@ -417,27 +317,20 @@ def generate_playlist(): ] extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}" - new_m3u_lines.append(extinf_line) - new_m3u_lines.append(stream_url) + m3u_lines.append(extinf_line) + m3u_lines.append(stream_url) valid_channels += 1 + # Write M3U try: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: - for line in new_m3u_lines: + for line in m3u_lines: f.write(line + '\n') - log_message(f"Successfully generated {PLAYLIST_FILE} with {valid_channels} unique channels.", "INFO") - - stats = { - "total_channels": valid_channels, - "groups": len(set(ch.get('Group', 'Uncategorized') for ch in parsed_channels)), - "generation_time": datetime.now().isoformat() - } - log_message(f"Generation stats: {json.dumps(stats)}", "INFO") - + log_message(f"Generated {PLAYLIST_FILE} with {valid_channels} channels", "INFO") except Exception as e: - log_message(f"Error writing {PLAYLIST_FILE}: {e}", "ERROR", ERROR_LOG) + log_message(f"Error writing playlist: {e}", "ERROR") - log_message("M3U playlist generation complete.", "INFO") + log_message("Playlist generation complete", "INFO") if __name__ == "__main__": generate_playlist() \ No newline at end of file