my-private-iptv-m3u/scripts/generate_playlist.py
stoney420 1a0cbed4f1
Some checks failed
Generate M3U Playlist / build (push) Has been cancelled
Update scripts/generate_playlist.py
2025-06-28 00:11:19 +02:00

202 lines
No EOL
7.4 KiB
Python

#!/usr/bin/env python3
"""
IPTV Playlist Generator - FIXED to run from root directory
Enhanced debugging to find why channels.txt is empty
"""
import logging
import os
import sys
from datetime import datetime
from pathlib import Path
# FIXED: Change to root directory and add scripts to path
script_dir = Path(__file__).parent
root_dir = script_dir.parent
# Change working directory to root
os.chdir(root_dir)
# Add scripts directory to Python path
sys.path.insert(0, str(script_dir))
# Import our modular components
from config_manager import ConfigManager
from channel_processor import ChannelProcessor
from file_manager import FileManager
from playlist_builder import PlaylistBuilder
from health_checker import HealthChecker
from report_generator import ReportGenerator
def setup_logging():
"""Setup comprehensive logging."""
logging.basicConfig(
level=logging.DEBUG, # Changed to DEBUG for more info
format='[%(asctime)s] %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
handlers=[
logging.FileHandler('playlist_update.log', encoding='utf-8'),
logging.StreamHandler()
]
)
def debug_file_system():
"""Debug the file system to see what files exist."""
logging.info("=== FILE SYSTEM DEBUG ===")
# Check current directory
current_dir = os.getcwd()
logging.info(f"Current working directory: {current_dir}")
# List all files in current directory
try:
files = os.listdir('.')
logging.info(f"Files in current directory: {files}")
except Exception as e:
logging.error(f"Could not list current directory: {e}")
# Check for specific files
files_to_check = [
'bulk_import.m3u',
'channels.txt',
'playlist.m3u'
]
for file_path in files_to_check:
if os.path.exists(file_path):
try:
size = os.path.getsize(file_path)
logging.info(f"✅ Found {file_path} (size: {size} bytes)")
# If it's the import file, show first few lines
if 'bulk_import.m3u' in file_path and size > 0:
with open(file_path, 'r', encoding='utf-8') as f:
first_lines = [f.readline().strip() for _ in range(3)]
logging.info(f" First 3 lines: {first_lines}")
except Exception as e:
logging.error(f" Error reading {file_path}: {e}")
else:
logging.info(f"❌ Missing: {file_path}")
logging.info("=== END FILE SYSTEM DEBUG ===")
def generate_playlist():
"""Main playlist generation function with enhanced debugging."""
try:
setup_logging()
logging.info("Starting DEBUG playlist generation from ROOT directory...")
# Debug file system first
debug_file_system()
# Initialize all modules
logging.info("Initializing modules...")
config = ConfigManager()
# Debug config
logging.info(f"Config channels_file: {config.channels_file}")
logging.info(f"Config import_file: {config.import_file}")
logging.info(f"Config settings: {config.settings}")
file_manager = FileManager(config)
processor = ChannelProcessor(config)
builder = PlaylistBuilder(config)
health_checker = HealthChecker(config)
report_gen = ReportGenerator(config)
# Clear log file
if os.path.exists('playlist_update.log'):
open('playlist_update.log', 'w').close()
# Statistics tracking
stats = {
'total_channels': 0,
'valid_channels': 0,
'duplicates_removed': 0,
'imported_channels': 0,
'countries_detected': 0,
'country_distribution': {}
}
# Step 1: Create backup
logging.info("=== STEP 1: Creating backup ===")
file_manager.create_backup('channels.txt')
# Step 2: Clean existing corrupted entries
logging.info("=== STEP 2: Cleaning corrupted channels ===")
processor.clean_corrupted_channels()
# Step 3: Force update existing channels with new country detection
logging.info("=== STEP 3: Updating existing channels ===")
processor.update_existing_channels_with_country_detection()
# Step 4: Process imports (THE CRITICAL STEP)
logging.info("=== STEP 4: Processing imports ===")
# Manual check for import file in current directory
if os.path.exists('bulk_import.m3u'):
logging.info(f"✅ Found bulk_import.m3u in current directory")
with open('bulk_import.m3u', 'r', encoding='utf-8') as f:
content = f.read()
logging.info(f"Import file has {len(content)} characters")
logging.info(f"Import file first 200 chars: {content[:200]}")
else:
logging.error(f"❌ bulk_import.m3u NOT found in current directory")
imported_channels = processor.process_import()
stats['imported_channels'] = len(imported_channels)
logging.info(f"Import returned {len(imported_channels)} channels")
if len(imported_channels) == 0:
logging.warning("NO CHANNELS IMPORTED! This is the problem.")
# Step 5: Load all channels
logging.info("=== STEP 5: Loading all channels ===")
all_channels = file_manager.load_all_channels()
stats['total_channels'] = len(all_channels)
logging.info(f"Loaded {len(all_channels)} total channels")
# Step 6: Remove duplicates
logging.info("=== STEP 6: Removing duplicates ===")
unique_channels = processor.remove_duplicates_optimized(all_channels)
stats['duplicates_removed'] = len(all_channels) - len(unique_channels)
logging.info(f"After deduplication: {len(unique_channels)} channels")
# Step 7: Sort channels
if config.settings.get('sort_channels', True):
unique_channels.sort(key=lambda x: (x.get('Group', '').lower(), x.get('Stream name', '').lower()))
# Step 8: Health check (optional)
health_results = {}
if config.settings.get('enable_health_check', False):
health_results = health_checker.batch_health_check(unique_channels)
# Step 9: Generate M3U playlist
logging.info("=== STEP 9: Generating M3U ===")
valid_channels, country_stats = builder.generate_m3u(unique_channels)
stats['valid_channels'] = valid_channels
stats['country_distribution'] = country_stats
stats['countries_detected'] = len(country_stats)
# Step 10: Generate report
logging.info("=== STEP 10: Generating report ===")
report_gen.save_report(stats, health_results)
logging.info(f"Playlist generation complete: {valid_channels} channels across {len(country_stats)} countries")
# Final debug
logging.info("=== FINAL DEBUG ===")
debug_file_system()
return True
except Exception as e:
logging.error(f"Fatal error in playlist generation: {e}")
import traceback
logging.error(traceback.format_exc())
return False
if __name__ == "__main__":
success = generate_playlist()
exit(0 if success else 1)