my-private-iptv-m3u/scripts/generate_playlist.py
stoney420 9abc1a2e99
Some checks failed
Generate M3U Playlist with Auto-Organization / build-and-organize (push) Failing after 14s
Update scripts/generate_playlist.py
2025-06-28 23:41:12 +02:00

333 lines
No EOL
13 KiB
Python

#!/usr/bin/env python3
"""
IPTV Playlist Generator - FIXED to run from root directory
Enhanced debugging with working imports and method calls
"""
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
# FIXED: Change to root directory and add scripts to path
script_dir = Path(__file__).parent
root_dir = script_dir.parent
# Change working directory to root
os.chdir(root_dir)
# Add scripts directory to Python path
sys.path.insert(0, str(script_dir))
# FIXED: Import our modular components with proper error handling
try:
from config_manager import ConfigManager
from channel_processor import ChannelProcessor
from file_manager import FileManager
from playlist_builder import PlaylistBuilder
from health_checker import HealthChecker
from report_generator import ReportGenerator
except ImportError as e:
print(f"Import error: {e}")
print("Make sure all required modules are in the scripts directory")
sys.exit(1)
def setup_logging():
"""Setup comprehensive logging."""
logging.basicConfig(
level=logging.INFO, # Changed back to INFO for cleaner output
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler('playlist_update.log', encoding='utf-8'),
logging.StreamHandler()
]
)
def debug_file_system():
"""Debug the file system to see what files exist."""
logging.info("=== FILE SYSTEM DEBUG ===")
# Check current directory
current_dir = os.getcwd()
logging.info(f"Current working directory: {current_dir}")
# List all files in current directory
try:
files = os.listdir('.')
logging.info(f"Files in current directory: {len(files)} files")
except Exception as e:
logging.error(f"Could not list current directory: {e}")
# Check for specific files
files_to_check = [
'bulk_import.m3u',
'channels.txt',
'playlist.m3u'
]
for file_path in files_to_check:
if os.path.exists(file_path):
try:
size = os.path.getsize(file_path)
logging.info(f"✅ Found {file_path} (size: {size} bytes)")
# If it's the import file, show first few lines
if 'bulk_import.m3u' in file_path and size > 0:
with open(file_path, 'r', encoding='utf-8') as f:
first_lines = [f.readline().strip() for _ in range(3)]
logging.info(f" First 3 lines: {first_lines}")
except Exception as e:
logging.error(f" Error reading {file_path}: {e}")
else:
logging.info(f"❌ Missing: {file_path}")
logging.info("=== END FILE SYSTEM DEBUG ===")
def load_existing_channels(channels_file):
"""Load existing channels from channels.txt file."""
channels = []
if not os.path.exists(channels_file):
logging.info(f"No existing channels file found: {channels_file}")
return channels
try:
with open(channels_file, 'r', encoding='utf-8') as f:
content = f.read()
# Split into channel blocks
blocks = content.split('\n\n')
for block in blocks:
if not block.strip():
continue
# Parse channel block
channel_data = {}
lines = block.strip().split('\n')
for line in lines:
if '=' in line:
key, value = line.split('=', 1)
channel_data[key.strip()] = value.strip()
if channel_data and channel_data.get('Stream name'):
channels.append(channel_data)
logging.info(f"Loaded {len(channels)} existing channels")
except Exception as e:
logging.error(f"Error loading existing channels: {e}")
return channels
def save_channels_to_file(channels, filename):
"""Save channels to file in proper format."""
try:
with open(filename, 'w', encoding='utf-8') as f:
for i, channel in enumerate(channels):
if i > 0:
f.write("\n\n")
f.write(f"Group = {channel.get('Group', 'Uncategorized')}\n")
f.write(f"Stream name = {channel.get('Stream name', 'Unknown')}\n")
f.write(f"Logo = {channel.get('Logo', '')}\n")
f.write(f"EPG id = {channel.get('EPG id', '')}\n")
f.write(f"Stream URL = {channel.get('Stream URL', '')}\n")
logging.info(f"Successfully saved {len(channels)} channels to {filename}")
return True
except Exception as e:
logging.error(f"Error saving channels to {filename}: {e}")
return False
def generate_m3u_playlist(channels, playlist_file):
"""Generate M3U playlist from channels."""
try:
with open(playlist_file, 'w', encoding='utf-8') as f:
f.write('#EXTM3U\n')
valid_channels = 0
country_stats = {}
for channel in channels:
stream_name = channel.get('Stream name', '')
group = channel.get('Group', 'Uncategorized')
logo = channel.get('Logo', '')
epg_id = channel.get('EPG id', '')
url = channel.get('Stream URL', '')
if stream_name and url:
f.write(f'#EXTINF:-1 group-title="{group}"')
if logo:
f.write(f' tvg-logo="{logo}"')
if epg_id:
f.write(f' tvg-id="{epg_id}"')
f.write(f',{stream_name}\n')
f.write(f'{url}\n')
valid_channels += 1
# Count by country
country_stats[group] = country_stats.get(group, 0) + 1
logging.info(f"Generated M3U playlist with {valid_channels} channels across {len(country_stats)} groups")
return valid_channels, country_stats
except Exception as e:
logging.error(f"Error generating M3U playlist: {e}")
return 0, {}
def generate_playlist():
"""Main playlist generation function with enhanced debugging."""
try:
setup_logging()
logging.info("🚀 Starting enhanced playlist generation...")
# Debug file system first
debug_file_system()
# Initialize configuration
logging.info("📋 Initializing configuration...")
config = ConfigManager()
# Debug config
logging.info(f"Config channels_file: {config.channels_file}")
logging.info(f"Config import_file: {config.import_file}")
# Initialize processor
processor = ChannelProcessor(config)
# Statistics tracking
stats = {
'total_channels': 0,
'valid_channels': 0,
'imported_channels': 0,
'countries_detected': 0,
'country_distribution': {}
}
# Step 1: Create backup if channels.txt exists
logging.info("=== STEP 1: Creating backup ===")
if os.path.exists('channels.txt'):
try:
backup_name = f"channels_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
import shutil
shutil.copy2('channels.txt', backup_name)
logging.info(f"✅ Created backup: {backup_name}")
except Exception as e:
logging.warning(f"Could not create backup: {e}")
# Step 2: Clean existing corrupted entries
logging.info("=== STEP 2: Cleaning corrupted channels ===")
try:
processor.clean_corrupted_channels()
logging.info("✅ Corruption cleanup completed")
except Exception as e:
logging.warning(f"Corruption cleanup error: {e}")
# Step 3: Force update existing channels with new country detection
logging.info("=== STEP 3: Updating existing channels with enhanced country detection ===")
try:
processor.update_existing_channels_with_country_detection()
logging.info("✅ Country detection update completed")
except Exception as e:
logging.warning(f"Country detection update error: {e}")
# Step 4: Process imports
logging.info("=== STEP 4: Processing imports ===")
# Check for import file
if os.path.exists('bulk_import.m3u'):
logging.info(f"✅ Found bulk_import.m3u")
try:
with open('bulk_import.m3u', 'r', encoding='utf-8') as f:
content = f.read()
logging.info(f"Import file has {len(content)} characters")
if len(content.strip()) > 10: # More than just #EXTM3U
imported_channels = processor.process_import()
stats['imported_channels'] = len(imported_channels)
logging.info(f"✅ Imported {len(imported_channels)} new channels")
else:
logging.info("Import file is empty, skipping import")
except Exception as e:
logging.error(f"Error processing import: {e}")
else:
logging.info("No import file found, skipping import")
# Step 5: Load all channels
logging.info("=== STEP 5: Loading all channels ===")
all_channels = load_existing_channels('channels.txt')
stats['total_channels'] = len(all_channels)
logging.info(f"✅ Loaded {len(all_channels)} total channels")
# Step 6: Remove duplicates
logging.info("=== STEP 6: Removing duplicates ===")
try:
unique_channels = processor.remove_duplicates_optimized(all_channels)
duplicates_removed = len(all_channels) - len(unique_channels)
logging.info(f"✅ After deduplication: {len(unique_channels)} channels ({duplicates_removed} duplicates removed)")
except Exception as e:
logging.warning(f"Deduplication error: {e}, using original channels")
unique_channels = all_channels
# Step 7: Sort channels by group and name
logging.info("=== STEP 7: Sorting channels ===")
try:
unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
logging.info("✅ Channels sorted by group and name")
except Exception as e:
logging.warning(f"Sorting error: {e}")
# Step 8: Save updated channels
logging.info("=== STEP 8: Saving updated channels ===")
if save_channels_to_file(unique_channels, 'channels.txt'):
logging.info("✅ Successfully saved updated channels.txt")
else:
logging.error("❌ Failed to save channels.txt")
# Step 9: Generate M3U playlist
logging.info("=== STEP 9: Generating M3U playlist ===")
valid_channels, country_stats = generate_m3u_playlist(unique_channels, 'playlist.m3u')
stats['valid_channels'] = valid_channels
stats['country_distribution'] = country_stats
stats['countries_detected'] = len(country_stats)
# Step 10: Generate summary report
logging.info("=== STEP 10: Generating summary report ===")
# Show top countries
sorted_countries = sorted(country_stats.items(), key=lambda x: x[1], reverse=True)
logging.info("🌍 Top Countries/Groups:")
for country, count in sorted_countries[:10]:
percentage = (count / valid_channels * 100) if valid_channels > 0 else 0
logging.info(f" {country}: {count} channels ({percentage:.1f}%)")
# Final summary
logging.info("🎉 PLAYLIST GENERATION COMPLETED SUCCESSFULLY!")
logging.info(f"📊 Final Statistics:")
logging.info(f" 📺 Total channels processed: {stats['total_channels']}")
logging.info(f" ✅ Valid channels in playlist: {stats['valid_channels']}")
logging.info(f" 📥 New channels imported: {stats['imported_channels']}")
logging.info(f" 🌍 Countries/groups detected: {stats['countries_detected']}")
# Final debug
logging.info("=== FINAL FILE CHECK ===")
debug_file_system()
return True
except Exception as e:
logging.error(f"❌ Fatal error in playlist generation: {e}")
import traceback
logging.error(traceback.format_exc())
return False
if __name__ == "__main__":
success = generate_playlist()
exit(0 if success else 1)