diff --git a/generate_playlist.py b/scripts/generate_playlist.py similarity index 65% rename from generate_playlist.py rename to scripts/generate_playlist.py index 7ab8210..af59b45 100644 --- a/generate_playlist.py +++ b/scripts/generate_playlist.py @@ -1,19 +1,74 @@ import re import os +import json +from datetime import datetime # --- Configuration --- CHANNELS_FILE = 'channels.txt' PLAYLIST_FILE = 'playlist.m3u' IMPORT_FILE = 'bulk_import.m3u' -LOG_FILE = 'playlist_update.log' +LOG_DIR = 'logs' +CONFIG_DIR = 'config' + +# Log files +MAIN_LOG = os.path.join(LOG_DIR, 'playlist_update.log') +IMPORT_LOG = os.path.join(LOG_DIR, 'import_history.log') +ERROR_LOG = os.path.join(LOG_DIR, 'error.log') + +# Config files +SETTINGS_FILE = os.path.join(CONFIG_DIR, 'settings.json') +GROUP_OVERRIDES_FILE = os.path.join(CONFIG_DIR, 'group_overrides.json') # --- Helper Functions --- -def log_message(message, level="INFO"): - """Logs messages to a file and prints them.""" - with open(LOG_FILE, 'a') as f: - f.write(f"{level}: {message}\n") - print(f"{level}: {message}") +def ensure_directories(): + """Create necessary directories if they don't exist.""" + os.makedirs(LOG_DIR, exist_ok=True) + os.makedirs(CONFIG_DIR, exist_ok=True) + +def load_settings(): + """Load settings from config file with defaults.""" + default_settings = { + "remove_duplicates": True, + "sort_channels": True, + "validate_urls": False, + "backup_before_import": True, + "auto_cleanup_import": True + } + + if os.path.exists(SETTINGS_FILE): + try: + with open(SETTINGS_FILE, 'r', encoding='utf-8') as f: + settings = json.load(f) + return {**default_settings, **settings} + except Exception as e: + log_message(f"Error loading settings, using defaults: {e}", "WARNING", ERROR_LOG) + + return default_settings + +def load_group_overrides(): + """Load group name overrides from config file.""" + if os.path.exists(GROUP_OVERRIDES_FILE): + try: + with open(GROUP_OVERRIDES_FILE, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + log_message(f"Error loading group overrides: {e}", "WARNING", ERROR_LOG) + + return {} + +def log_message(message, level="INFO", log_file=MAIN_LOG): + """Logs messages to specified file and prints them.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + formatted_message = f"[{timestamp}] {level}: {message}" + + try: + with open(log_file, 'a', encoding='utf-8') as f: + f.write(formatted_message + "\n") + except Exception as e: + print(f"ERROR: Could not write to log file {log_file}: {e}") + + print(formatted_message) def parse_channel_block(block): """Parses a single channel block from channels.txt.""" @@ -32,7 +87,6 @@ def parse_channel_block(block): def parse_m3u_entry(extinf_line, url_line): """Parses an M3U #EXTINF and URL line into a dictionary.""" channel = {} - # Extract attributes using regex tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) @@ -43,13 +97,25 @@ def parse_m3u_entry(extinf_line, url_line): channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else '' - # Stream name is after the last comma stream_name_match = re.search(r',(.+)$', extinf_line) channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel' channel['Stream URL'] = url_line.strip() return channel +def apply_group_overrides(channel, group_overrides): + """Apply group name overrides to channel.""" + stream_name = channel.get('Stream name', '').lower() + current_group = channel.get('Group', 'Uncategorized') + + for key, new_group in group_overrides.items(): + if key.lower() in stream_name: + log_message(f"Override: '{channel.get('Stream name')}' moved from '{current_group}' to '{new_group}'", "DEBUG") + channel['Group'] = new_group + break + + return channel + def convert_m3u_to_channels_txt_block(m3u_channel_data): """Converts a parsed M3U channel entry to channels.txt block format.""" block = [] @@ -62,18 +128,20 @@ def convert_m3u_to_channels_txt_block(m3u_channel_data): def get_channel_signature(channel): """Creates a unique signature for a channel to detect duplicates.""" - # Use stream name and URL as the primary identifier stream_name = channel.get('Stream name', '').strip().lower() stream_url = channel.get('Stream URL', '').strip().lower() - # Clean up stream name for comparison (remove common variations) - stream_name_clean = re.sub(r'\s+', ' ', stream_name) # Normalize whitespace - stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) # Remove special chars + stream_name_clean = re.sub(r'\s+', ' ', stream_name) + stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) return f"{stream_name_clean}|{stream_url}" -def remove_duplicates_from_channels(channels): +def remove_duplicates_from_channels(channels, settings): """Removes duplicate channels based on stream name and URL.""" + if not settings.get('remove_duplicates', True): + log_message("Duplicate removal disabled in settings", "INFO") + return channels + seen_signatures = set() unique_channels = [] duplicate_count = 0 @@ -97,22 +165,51 @@ def remove_duplicates_from_channels(channels): return unique_channels +def backup_channels_file(): + """Create a backup of channels.txt before import.""" + if os.path.exists(CHANNELS_FILE): + backup_name = f"{CHANNELS_FILE}.backup.{datetime.now().strftime('%Y%m%d_%H%M%S')}" + try: + import shutil + shutil.copy2(CHANNELS_FILE, backup_name) + log_message(f"Created backup: {backup_name}", "INFO") + return backup_name + except Exception as e: + log_message(f"Failed to create backup: {e}", "WARNING", ERROR_LOG) + return None + +def log_import_statistics(imported_count, duplicate_count, existing_count): + """Log import statistics to import history.""" + stats = { + "timestamp": datetime.now().isoformat(), + "imported": imported_count, + "duplicates_removed": duplicate_count, + "already_existed": existing_count, + "total_processed": imported_count + duplicate_count + existing_count + } + + log_message(f"Import completed: {json.dumps(stats)}", "INFO", IMPORT_LOG) + def process_import_m3u(): """Processes bulk_import.m3u to add channels to channels.txt.""" + settings = load_settings() + group_overrides = load_group_overrides() + if not os.path.exists(IMPORT_FILE): log_message(f"No {IMPORT_FILE} found, skipping import.", "INFO") return [] log_message(f"Processing {IMPORT_FILE} for bulk import...", "INFO") - # Check file size first + if settings.get('backup_before_import', True): + backup_channels_file() + file_size = os.path.getsize(IMPORT_FILE) log_message(f"{IMPORT_FILE} file size: {file_size} bytes", "DEBUG") imported_channels = [] try: - # Read the import file log_message(f"Reading {IMPORT_FILE}...", "DEBUG") with open(IMPORT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() @@ -128,18 +225,17 @@ def process_import_m3u(): extinf_line = line url_line = lines[i+1].strip() - # Skip empty URLs if not url_line or url_line.startswith('#'): i += 1 continue channel_data = parse_m3u_entry(extinf_line, url_line) + channel_data = apply_group_overrides(channel_data, group_overrides) - # Only require Stream name and Stream URL if channel_data.get('Stream name') and channel_data.get('Stream URL'): imported_channels.append(channel_data) parsed_count += 1 - if parsed_count <= 3: # Show first 3 for debugging + if parsed_count <= 3: log_message(f"Sample channel {parsed_count}: {channel_data.get('Stream name')}", "DEBUG") else: log_message(f"Skipping channel - missing required fields: {channel_data.get('Stream name', 'No name')}", "WARNING") @@ -152,14 +248,16 @@ def process_import_m3u(): log_message(f"Parsed {parsed_count} valid channels from {IMPORT_FILE}", "INFO") - # **NEW: Remove duplicates from imported channels before adding to channels.txt** - if imported_channels: + if imported_channels and settings.get('remove_duplicates', True): log_message("Checking imported channels for duplicates...", "INFO") - imported_channels = remove_duplicates_from_channels(imported_channels) + original_import_count = len(imported_channels) + imported_channels = remove_duplicates_from_channels(imported_channels, settings) + import_duplicate_count = original_import_count - len(imported_channels) log_message(f"After duplicate removal: {len(imported_channels)} unique channels to import", "INFO") + else: + import_duplicate_count = 0 if imported_channels: - # Check if channels already exist in channels.txt to prevent re-importing existing_channels = [] if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: @@ -171,7 +269,6 @@ def process_import_m3u(): log_message(f"Found {len(existing_channels)} existing channels in {CHANNELS_FILE}", "DEBUG") - # Check for duplicates between existing and imported existing_signatures = {get_channel_signature(ch) for ch in existing_channels} new_channels = [] already_exists_count = 0 @@ -187,134 +284,120 @@ def process_import_m3u(): imported_channels = new_channels log_message(f"Final import count: {len(imported_channels)} new unique channels", "INFO") - - # Check channels.txt before writing - lines_before = 0 - if os.path.exists(CHANNELS_FILE): - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - lines_before = len(f.readlines()) - log_message(f"{CHANNELS_FILE} has {lines_before} lines before import", "DEBUG") else: - log_message(f"{CHANNELS_FILE} does not exist, will create it", "DEBUG") + already_exists_count = 0 - # Append to channels.txt if imported_channels: + lines_before = 0 + if os.path.exists(CHANNELS_FILE): + with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: + lines_before = len(f.readlines()) + log_message(f"{CHANNELS_FILE} has {lines_before} lines before import", "DEBUG") + log_message(f"Attempting to append {len(imported_channels)} channels to {CHANNELS_FILE}...", "DEBUG") try: with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: for i, channel in enumerate(imported_channels): - # Add separators between channels if i > 0 or lines_before > 0: f.write("\n\n") block_content = convert_m3u_to_channels_txt_block(channel) f.write(block_content) - if i < 3: # Log first 3 for debugging + if i < 3: log_message(f"Wrote channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") log_message(f"Successfully appended {len(imported_channels)} unique channels to {CHANNELS_FILE}.", "INFO") - - # Verify the write worked - with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: - lines_after = len(f.readlines()) - log_message(f"Verification: {CHANNELS_FILE} now has {lines_after} lines (was {lines_before})", "INFO") + log_import_statistics(len(imported_channels), import_duplicate_count, already_exists_count) except Exception as write_error: - log_message(f"ERROR writing to {CHANNELS_FILE}: {write_error}", "ERROR") - return imported_channels # Return anyway for M3U generation + log_message(f"ERROR writing to {CHANNELS_FILE}: {write_error}", "ERROR", ERROR_LOG) + return imported_channels else: log_message("No new unique channels to import after duplicate checking.", "INFO") - + log_import_statistics(0, import_duplicate_count, already_exists_count) else: log_message(f"No valid channels found in {IMPORT_FILE}.", "INFO") except Exception as e: - log_message(f"Error processing {IMPORT_FILE}: {e}", "ERROR") + log_message(f"Error processing {IMPORT_FILE}: {e}", "ERROR", ERROR_LOG) return imported_channels - # Clean up the import file + if settings.get('auto_cleanup_import', True): + cleanup_import_file() + + return imported_channels + +def cleanup_import_file(): + """Clean up the import file after processing.""" log_message(f"Attempting to clean up {IMPORT_FILE}...", "DEBUG") try: - # First try to delete os.remove(IMPORT_FILE) log_message(f"Successfully deleted {IMPORT_FILE} after processing.", "INFO") except PermissionError as pe: log_message(f"Permission denied deleting {IMPORT_FILE}: {pe}", "WARNING") - # Try to clear instead - try: - with open(IMPORT_FILE, 'w', encoding='utf-8') as f: - f.write('') # Clear the file content - log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO") - except Exception as clear_error: - log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR") - except Exception as e: - log_message(f"Unexpected error deleting {IMPORT_FILE}: {e}", "WARNING") - # Try to clear instead try: with open(IMPORT_FILE, 'w', encoding='utf-8') as f: f.write('') log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO") except Exception as clear_error: - log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR") - - return imported_channels + log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR", ERROR_LOG) + except Exception as e: + log_message(f"Unexpected error deleting {IMPORT_FILE}: {e}", "WARNING") def generate_playlist(): """Main function to generate the M3U playlist.""" - # Clear previous log content - if os.path.exists(LOG_FILE): - open(LOG_FILE, 'w').close() + ensure_directories() + settings = load_settings() + group_overrides = load_group_overrides() + + if os.path.exists(MAIN_LOG): + open(MAIN_LOG, 'w').close() log_message("Starting M3U playlist generation...", "INFO") + log_message(f"Settings: {json.dumps(settings, indent=2)}", "DEBUG") - # Process import file first and get the imported channels imported_channels = process_import_m3u() log_message(f"Import process returned {len(imported_channels)} channels", "DEBUG") - # Now read from channels.txt (which should include the imported channels) if not os.path.exists(CHANNELS_FILE): - log_message(f"Error: {CHANNELS_FILE} not found.", "ERROR") + log_message(f"Error: {CHANNELS_FILE} not found.", "ERROR", ERROR_LOG) return - # Read channels.txt with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() log_message(f"Read {len(content)} characters from {CHANNELS_FILE}", "DEBUG") - # Split content into blocks by two or more newlines channel_blocks = re.split(r'\n\s*\n+', content.strip()) log_message(f"Found {len(channel_blocks)} channel blocks in {CHANNELS_FILE}", "DEBUG") parsed_channels = [] - for i, block in enumerate(channel_blocks): if block.strip(): channel = parse_channel_block(block) if channel: + channel = apply_group_overrides(channel, group_overrides) parsed_channels.append(channel) - if i < 5: # Log first 5 for debugging + if i < 5: log_message(f"Parsed channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") log_message(f"Successfully parsed {len(parsed_channels)} channels from {CHANNELS_FILE}", "INFO") - # **NEW: Remove duplicates from all channels before generating M3U** - log_message("Checking all channels for duplicates before generating M3U...", "INFO") original_count = len(parsed_channels) - parsed_channels = remove_duplicates_from_channels(parsed_channels) + parsed_channels = remove_duplicates_from_channels(parsed_channels, settings) final_count = len(parsed_channels) if original_count != final_count: log_message(f"Final M3U will have {final_count} unique channels (removed {original_count - final_count} duplicates)", "INFO") - # Start building M3U + if settings.get('sort_channels', True): + parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) + log_message("Channels sorted by group and name", "DEBUG") + new_m3u_lines = ["#EXTM3U"] - - # Sort channels by Group then Stream name - parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) - valid_channels = 0 + for channel in parsed_channels: stream_name = channel.get('Stream name', '') group_name = channel.get('Group', 'Uncategorized') @@ -322,7 +405,6 @@ def generate_playlist(): epg_id = channel.get('EPG id', '') stream_url = channel.get('Stream URL', '') - # Only require Stream name and Stream URL if not stream_name or not stream_url: log_message(f"Skipping channel due to missing required field: {stream_name or 'Unknown'}", "WARNING") continue @@ -339,14 +421,21 @@ def generate_playlist(): new_m3u_lines.append(stream_url) valid_channels += 1 - # Write the new M3U file try: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: for line in new_m3u_lines: f.write(line + '\n') log_message(f"Successfully generated {PLAYLIST_FILE} with {valid_channels} unique channels.", "INFO") + + stats = { + "total_channels": valid_channels, + "groups": len(set(ch.get('Group', 'Uncategorized') for ch in parsed_channels)), + "generation_time": datetime.now().isoformat() + } + log_message(f"Generation stats: {json.dumps(stats)}", "INFO") + except Exception as e: - log_message(f"Error writing {PLAYLIST_FILE}: {e}", "ERROR") + log_message(f"Error writing {PLAYLIST_FILE}: {e}", "ERROR", ERROR_LOG) log_message("M3U playlist generation complete.", "INFO")