import re import os # --- Configuration --- CHANNELS_FILE = 'channels.txt' PLAYLIST_FILE = 'playlist.m3u' IMPORT_FILE = 'bulk_import.m3u' LOG_FILE = 'playlist_update.log' # --- Helper Functions --- def log_message(message, level="INFO"): """Logs messages to a file and prints them.""" with open(LOG_FILE, 'a') as f: f.write(f"{level}: {message}\n") print(f"{level}: {message}") def parse_channel_block(block): """Parses a single channel block from channels.txt.""" channel_data = {} lines = block.strip().split('\n') for line in lines: if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() channel_data[key] = value return channel_data def parse_m3u_entry(extinf_line, url_line): """Parses an M3U #EXTINF and URL line into a dictionary.""" channel = {} # Extract attributes using regex tvg_id_match = re.search(r'tvg-id="([^"]*)"', extinf_line) tvg_logo_match = re.search(r'tvg-logo="([^"]*)"', extinf_line) group_title_match = re.search(r'group-title="([^"]*)"', extinf_line) tvg_name_match = re.search(r'tvg-name="([^"]*)"', extinf_line) channel['EPG id'] = tvg_id_match.group(1) if tvg_id_match else '' channel['Logo'] = tvg_logo_match.group(1) if tvg_logo_match else '' channel['Group'] = group_title_match.group(1) if group_title_match else 'Uncategorized' channel['TVG Name'] = tvg_name_match.group(1) if tvg_name_match else '' # Stream name is after the last comma stream_name_match = re.search(r',(.+)$', extinf_line) channel['Stream name'] = stream_name_match.group(1).strip() if stream_name_match else 'Unknown Channel' channel['Stream URL'] = url_line.strip() return channel def convert_m3u_to_channels_txt_block(m3u_channel_data): """Converts a parsed M3U channel entry to channels.txt block format.""" block = [] block.append(f"Group = {m3u_channel_data.get('Group', 'Uncategorized')}") block.append(f"Stream name = {m3u_channel_data.get('Stream name', 'Unknown Channel')}") block.append(f"Logo = {m3u_channel_data.get('Logo', '')}") block.append(f"EPG id = {m3u_channel_data.get('EPG id', '')}") block.append(f"Stream URL = {m3u_channel_data.get('Stream URL', '')}") return "\n".join(block) def get_channel_signature(channel): """Creates a unique signature for a channel to detect duplicates.""" # Use stream name and URL as the primary identifier stream_name = channel.get('Stream name', '').strip().lower() stream_url = channel.get('Stream URL', '').strip().lower() # Clean up stream name for comparison (remove common variations) stream_name_clean = re.sub(r'\s+', ' ', stream_name) # Normalize whitespace stream_name_clean = re.sub(r'[^\w\s]', '', stream_name_clean) # Remove special chars return f"{stream_name_clean}|{stream_url}" def remove_duplicates_from_channels(channels): """Removes duplicate channels based on stream name and URL.""" seen_signatures = set() unique_channels = [] duplicate_count = 0 log_message(f"Checking {len(channels)} channels for duplicates...", "DEBUG") for channel in channels: signature = get_channel_signature(channel) if signature not in seen_signatures: seen_signatures.add(signature) unique_channels.append(channel) else: duplicate_count += 1 log_message(f"Duplicate found: {channel.get('Stream name', 'Unknown')} - {channel.get('Stream URL', 'No URL')[:50]}...", "DEBUG") if duplicate_count > 0: log_message(f"Removed {duplicate_count} duplicate channels.", "INFO") else: log_message("No duplicates found.", "INFO") return unique_channels def process_import_m3u(): """Processes bulk_import.m3u to add channels to channels.txt.""" if not os.path.exists(IMPORT_FILE): log_message(f"No {IMPORT_FILE} found, skipping import.", "INFO") return [] log_message(f"Processing {IMPORT_FILE} for bulk import...", "INFO") # Check file size first file_size = os.path.getsize(IMPORT_FILE) log_message(f"{IMPORT_FILE} file size: {file_size} bytes", "DEBUG") imported_channels = [] try: # Read the import file log_message(f"Reading {IMPORT_FILE}...", "DEBUG") with open(IMPORT_FILE, 'r', encoding='utf-8') as f: lines = f.readlines() log_message(f"Found {len(lines)} lines in {IMPORT_FILE}", "DEBUG") i = 0 parsed_count = 0 while i < len(lines): line = lines[i].strip() if line.startswith('#EXTINF:'): if i + 1 < len(lines): extinf_line = line url_line = lines[i+1].strip() # Skip empty URLs if not url_line or url_line.startswith('#'): i += 1 continue channel_data = parse_m3u_entry(extinf_line, url_line) # Only require Stream name and Stream URL if channel_data.get('Stream name') and channel_data.get('Stream URL'): imported_channels.append(channel_data) parsed_count += 1 if parsed_count <= 3: # Show first 3 for debugging log_message(f"Sample channel {parsed_count}: {channel_data.get('Stream name')}", "DEBUG") else: log_message(f"Skipping channel - missing required fields: {channel_data.get('Stream name', 'No name')}", "WARNING") i += 2 else: i += 1 else: i += 1 log_message(f"Parsed {parsed_count} valid channels from {IMPORT_FILE}", "INFO") # **NEW: Remove duplicates from imported channels before adding to channels.txt** if imported_channels: log_message("Checking imported channels for duplicates...", "INFO") imported_channels = remove_duplicates_from_channels(imported_channels) log_message(f"After duplicate removal: {len(imported_channels)} unique channels to import", "INFO") if imported_channels: # Check if channels already exist in channels.txt to prevent re-importing existing_channels = [] if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() channel_blocks = re.split(r'\n\s*\n+', content.strip()) for block in channel_blocks: if block.strip(): existing_channels.append(parse_channel_block(block)) log_message(f"Found {len(existing_channels)} existing channels in {CHANNELS_FILE}", "DEBUG") # Check for duplicates between existing and imported existing_signatures = {get_channel_signature(ch) for ch in existing_channels} new_channels = [] already_exists_count = 0 for channel in imported_channels: if get_channel_signature(channel) not in existing_signatures: new_channels.append(channel) else: already_exists_count += 1 if already_exists_count > 0: log_message(f"Skipped {already_exists_count} channels that already exist in {CHANNELS_FILE}", "INFO") imported_channels = new_channels log_message(f"Final import count: {len(imported_channels)} new unique channels", "INFO") # Check channels.txt before writing lines_before = 0 if os.path.exists(CHANNELS_FILE): with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: lines_before = len(f.readlines()) log_message(f"{CHANNELS_FILE} has {lines_before} lines before import", "DEBUG") else: log_message(f"{CHANNELS_FILE} does not exist, will create it", "DEBUG") # Append to channels.txt if imported_channels: log_message(f"Attempting to append {len(imported_channels)} channels to {CHANNELS_FILE}...", "DEBUG") try: with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: for i, channel in enumerate(imported_channels): # Add separators between channels if i > 0 or lines_before > 0: f.write("\n\n") block_content = convert_m3u_to_channels_txt_block(channel) f.write(block_content) if i < 3: # Log first 3 for debugging log_message(f"Wrote channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") log_message(f"Successfully appended {len(imported_channels)} unique channels to {CHANNELS_FILE}.", "INFO") # Verify the write worked with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: lines_after = len(f.readlines()) log_message(f"Verification: {CHANNELS_FILE} now has {lines_after} lines (was {lines_before})", "INFO") except Exception as write_error: log_message(f"ERROR writing to {CHANNELS_FILE}: {write_error}", "ERROR") return imported_channels # Return anyway for M3U generation else: log_message("No new unique channels to import after duplicate checking.", "INFO") else: log_message(f"No valid channels found in {IMPORT_FILE}.", "INFO") except Exception as e: log_message(f"Error processing {IMPORT_FILE}: {e}", "ERROR") return imported_channels # Clean up the import file log_message(f"Attempting to clean up {IMPORT_FILE}...", "DEBUG") try: # First try to delete os.remove(IMPORT_FILE) log_message(f"Successfully deleted {IMPORT_FILE} after processing.", "INFO") except PermissionError as pe: log_message(f"Permission denied deleting {IMPORT_FILE}: {pe}", "WARNING") # Try to clear instead try: with open(IMPORT_FILE, 'w', encoding='utf-8') as f: f.write('') # Clear the file content log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO") except Exception as clear_error: log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR") except Exception as e: log_message(f"Unexpected error deleting {IMPORT_FILE}: {e}", "WARNING") # Try to clear instead try: with open(IMPORT_FILE, 'w', encoding='utf-8') as f: f.write('') log_message(f"Cleared content of {IMPORT_FILE} instead.", "INFO") except Exception as clear_error: log_message(f"ERROR: Could not delete or clear {IMPORT_FILE}: {clear_error}", "ERROR") return imported_channels def generate_playlist(): """Main function to generate the M3U playlist.""" # Clear previous log content if os.path.exists(LOG_FILE): open(LOG_FILE, 'w').close() log_message("Starting M3U playlist generation...", "INFO") # Process import file first and get the imported channels imported_channels = process_import_m3u() log_message(f"Import process returned {len(imported_channels)} channels", "DEBUG") # Now read from channels.txt (which should include the imported channels) if not os.path.exists(CHANNELS_FILE): log_message(f"Error: {CHANNELS_FILE} not found.", "ERROR") return # Read channels.txt with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: content = f.read() log_message(f"Read {len(content)} characters from {CHANNELS_FILE}", "DEBUG") # Split content into blocks by two or more newlines channel_blocks = re.split(r'\n\s*\n+', content.strip()) log_message(f"Found {len(channel_blocks)} channel blocks in {CHANNELS_FILE}", "DEBUG") parsed_channels = [] for i, block in enumerate(channel_blocks): if block.strip(): channel = parse_channel_block(block) if channel: parsed_channels.append(channel) if i < 5: # Log first 5 for debugging log_message(f"Parsed channel {i+1}: {channel.get('Stream name', 'Unknown')}", "DEBUG") log_message(f"Successfully parsed {len(parsed_channels)} channels from {CHANNELS_FILE}", "INFO") # **NEW: Remove duplicates from all channels before generating M3U** log_message("Checking all channels for duplicates before generating M3U...", "INFO") original_count = len(parsed_channels) parsed_channels = remove_duplicates_from_channels(parsed_channels) final_count = len(parsed_channels) if original_count != final_count: log_message(f"Final M3U will have {final_count} unique channels (removed {original_count - final_count} duplicates)", "INFO") # Start building M3U new_m3u_lines = ["#EXTM3U"] # Sort channels by Group then Stream name parsed_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower())) valid_channels = 0 for channel in parsed_channels: stream_name = channel.get('Stream name', '') group_name = channel.get('Group', 'Uncategorized') logo_url = channel.get('Logo', '') epg_id = channel.get('EPG id', '') stream_url = channel.get('Stream URL', '') # Only require Stream name and Stream URL if not stream_name or not stream_url: log_message(f"Skipping channel due to missing required field: {stream_name or 'Unknown'}", "WARNING") continue extinf_attrs = [ f'tvg-id="{epg_id}"', f'tvg-logo="{logo_url}"', f'group-title="{group_name}"', f'tvg-name="{stream_name}"' ] extinf_line = f"#EXTINF:-1 {' '.join(extinf_attrs)},{stream_name}" new_m3u_lines.append(extinf_line) new_m3u_lines.append(stream_url) valid_channels += 1 # Write the new M3U file try: with open(PLAYLIST_FILE, 'w', encoding='utf-8') as f: for line in new_m3u_lines: f.write(line + '\n') log_message(f"Successfully generated {PLAYLIST_FILE} with {valid_channels} unique channels.", "INFO") except Exception as e: log_message(f"Error writing {PLAYLIST_FILE}: {e}", "ERROR") log_message("M3U playlist generation complete.", "INFO") if __name__ == "__main__": generate_playlist()