Update scripts/generate_playlist.py
All checks were successful
📺 Generate M3U Playlist / build (push) Successful in 1m9s

This commit is contained in:
stoney420 2025-06-27 20:00:13 +02:00
parent 15ae2ac453
commit b38d15a718

View file

@ -367,7 +367,7 @@ def update_existing_channels_with_country_detection():
log_message(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)", "INFO") log_message(f"FORCE updated ALL {len(updated_channels)} channels ({changes} changes made)", "INFO")
def process_import(): def process_import():
"""Process bulk M3U import with comprehensive filtering.""" """Process bulk M3U import with ROBUST handling of malformed files."""
settings = load_settings() settings = load_settings()
group_overrides = load_group_overrides() group_overrides = load_group_overrides()
@ -375,60 +375,129 @@ def process_import():
log_message(f"No {IMPORT_FILE} found, skipping import", "INFO") log_message(f"No {IMPORT_FILE} found, skipping import", "INFO")
return [] return []
log_message(f"Processing {IMPORT_FILE} for comprehensive bulk import...", "INFO") log_message(f"Processing {IMPORT_FILE} with ROBUST parsing...", "INFO")
stats = { stats = {
'total_lines': 0, 'extinf_lines': 0, 'parsed': 0, 'valid': 0, 'total_lines': 0, 'extinf_lines': 0, 'parsed': 0, 'valid': 0,
'filtered_adult': 0, 'filtered_invalid': 0, 'duplicates': 0, 'filtered_adult': 0, 'filtered_invalid': 0, 'duplicates': 0,
'already_existed': 0, 'final_imported': 0 'already_existed': 0, 'final_imported': 0, 'malformed_fixed': 0
} }
imported_channels = [] imported_channels = []
try: try:
with open(IMPORT_FILE, 'r', encoding='utf-8') as f: with open(IMPORT_FILE, 'r', encoding='utf-8') as f:
lines = f.readlines() content = f.read()
# Pre-process the content to fix common issues
log_message("Pre-processing M3U content to fix common issues...", "INFO")
# Fix missing newlines between entries
content = re.sub(r'(https?://[^\s]+)(#EXTINF)', r'\1\n\2', content)
content = re.sub(r'(\.m3u8?)(#EXTINF)', r'\1\n\2', content)
content = re.sub(r'(\.ts)(#EXTINF)', r'\1\n\2', content)
# Split into lines after fixing
lines = content.split('\n')
stats['total_lines'] = len(lines) stats['total_lines'] = len(lines)
log_message(f"Processing {len(lines)} lines...", "INFO") log_message(f"Processing {len(lines)} lines after pre-processing...", "INFO")
i = 0 i = 0
while i < len(lines): while i < len(lines):
line = lines[i].strip() line = lines[i].strip()
if line.startswith('#EXTINF:'): if line.startswith('#EXTINF:'):
stats['extinf_lines'] += 1 stats['extinf_lines'] += 1
if i + 1 < len(lines): extinf_line = line
extinf_line = line url_line = ""
url_line = lines[i+1].strip()
# Look for the URL in the next few lines (robust search)
j = i + 1
while j < len(lines) and j < i + 5: # Look ahead max 5 lines
potential_url = lines[j].strip()
if url_line and not url_line.startswith('#'): # Skip empty lines and comments
if not potential_url or potential_url.startswith('#'):
j += 1
continue
# Clean potential URL
if '#EXTINF' in potential_url:
# Split on #EXTINF and take the first part
url_parts = potential_url.split('#EXTINF')
potential_url = url_parts[0].strip()
# Put the EXTINF part back for next iteration
if len(url_parts) > 1:
lines[j] = '#EXTINF' + url_parts[1]
stats['malformed_fixed'] += 1
# Check if it looks like a URL
if (potential_url.startswith(('http://', 'https://', 'rtmp://', 'rtmps://')) or
potential_url.endswith(('.m3u8', '.ts', '.mp4')) or
'/' in potential_url):
url_line = potential_url
i = j # Update our position
break
j += 1
# If we found a URL, process the channel
if url_line:
try:
channel = parse_m3u_entry(extinf_line, url_line) channel = parse_m3u_entry(extinf_line, url_line)
stats['parsed'] += 1 stats['parsed'] += 1
# Additional URL cleaning
stream_url = channel.get('Stream URL', '').strip()
# Remove any trailing garbage
if ' ' in stream_url:
url_parts = stream_url.split()
for part in url_parts:
if (part.startswith(('http://', 'https://', 'rtmp://')) or
part.endswith(('.m3u8', '.ts', '.mp4'))):
channel['Stream URL'] = part
break
# Validate the channel
is_valid, reason = validate_channel(channel, settings) is_valid, reason = validate_channel(channel, settings)
if not is_valid: if not is_valid:
if "adult" in reason.lower(): if "adult" in reason.lower():
stats['filtered_adult'] += 1 stats['filtered_adult'] += 1
else: else:
stats['filtered_invalid'] += 1 stats['filtered_invalid'] += 1
i += 2 log_message(f"Filtered: {channel.get('Stream name')} - {reason}", "DEBUG")
i += 1
continue continue
# Apply country detection
channel = apply_auto_country_detection(channel, group_overrides, settings) channel = apply_auto_country_detection(channel, group_overrides, settings)
imported_channels.append(channel) imported_channels.append(channel)
stats['valid'] += 1 stats['valid'] += 1
i += 2 log_message(f"Successfully imported: {channel.get('Stream name')}{channel.get('Group')}", "DEBUG")
except Exception as e:
log_message(f"Error processing channel: {e}", "WARNING")
i += 1
continue
else: else:
log_message(f"No URL found for: {extinf_line[:50]}...", "WARNING")
i += 1 i += 1
else: continue
i += 1
i += 1
# Continue with duplicate removal and file writing...
if imported_channels: if imported_channels:
log_message(f"Pre-duplicate removal: {len(imported_channels)} channels", "INFO")
original_count = len(imported_channels) original_count = len(imported_channels)
imported_channels = remove_duplicates(imported_channels, settings) imported_channels = remove_duplicates(imported_channels, settings)
stats['duplicates'] = original_count - len(imported_channels) stats['duplicates'] = original_count - len(imported_channels)
# Check against existing channels
existing_channels = [] existing_channels = []
if os.path.exists(CHANNELS_FILE): if os.path.exists(CHANNELS_FILE):
with open(CHANNELS_FILE, 'r', encoding='utf-8') as f: with open(CHANNELS_FILE, 'r', encoding='utf-8') as f:
@ -450,21 +519,31 @@ def process_import():
stats['final_imported'] = len(imported_channels) stats['final_imported'] = len(imported_channels)
# Write to file
if imported_channels: if imported_channels:
log_message(f"Writing {len(imported_channels)} new channels to file...", "INFO")
# Check if file exists and has content
file_exists = os.path.exists(CHANNELS_FILE) and os.path.getsize(CHANNELS_FILE) > 0
with open(CHANNELS_FILE, 'a', encoding='utf-8') as f: with open(CHANNELS_FILE, 'a', encoding='utf-8') as f:
for i, channel in enumerate(imported_channels): for i, channel in enumerate(imported_channels):
if i > 0 or os.path.getsize(CHANNELS_FILE) > 0: if i > 0 or file_exists:
f.write("\n\n") f.write("\n\n")
f.write(convert_to_channels_txt_block(channel)) f.write(convert_to_channels_txt_block(channel))
log_message(f"Successfully wrote {len(imported_channels)} channels", "INFO")
except Exception as e: except Exception as e:
log_message(f"Error processing import: {e}", "ERROR") log_message(f"Error processing import: {e}", "ERROR")
log_message("=== COMPREHENSIVE IMPORT STATISTICS ===", "INFO") # Enhanced statistics
log_message("=== ROBUST IMPORT STATISTICS ===", "INFO")
for key, value in stats.items(): for key, value in stats.items():
log_message(f"{key.replace('_', ' ').title()}: {value}", "INFO") log_message(f"{key.replace('_', ' ').title()}: {value}", "INFO")
log_message("=== END STATISTICS ===", "INFO") log_message("=== END STATISTICS ===", "INFO")
# Cleanup
if settings.get('auto_cleanup_import', True): if settings.get('auto_cleanup_import', True):
try: try:
os.remove(IMPORT_FILE) os.remove(IMPORT_FILE)