Delete scripts/source_scraper.py
Some checks are pending
Generate M3U Playlist with Auto-Organization / build-and-organize (push) Waiting to run

This commit is contained in:
stoney420 2025-06-28 05:41:39 +02:00
parent f9612e7acc
commit 482d4cd7f1

View file

@ -1,294 +0,0 @@
# scripts/source_scraper.py
"""
Automated Channel Discovery for IPTV Playlist Generator
Integrates seamlessly with existing architecture
"""
import requests
import json
import os
import logging
from datetime import datetime
from typing import List, Dict, Set
import re
from urllib.parse import urlparse
class SourceScraper:
def __init__(self):
self.setup_logging()
self.load_config()
self.discovered_channels = []
self.source_stats = {}
def setup_logging(self):
"""Setup logging consistent with existing system"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
def load_config(self):
"""Load discovery sources configuration"""
try:
with open('config/discovery_sources.json', 'r') as f:
self.config = json.load(f)
except FileNotFoundError:
# Create default config if it doesn't exist
self.config = {
"enabled": True,
"sources": [
{
"name": "IPTV-Org Main",
"url": "https://raw.githubusercontent.com/iptv-org/iptv/master/streams/",
"type": "github_directory",
"country_filter": ["us", "uk", "ca", "au"]
},
{
"name": "Free-TV Collection",
"url": "https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8",
"type": "m3u_playlist",
"quality_filter": ["hd", "fhd", "4k"]
}
],
"filters": {
"min_quality": "sd",
"exclude_adult": True,
"max_channels_per_source": 100,
"require_country_detection": False
},
"rate_limiting": {
"delay_between_requests": 1.0,
"max_retries": 3
}
}
os.makedirs('config', exist_ok=True)
with open('config/discovery_sources.json', 'w') as f:
json.dump(self.config, f, indent=2)
def discover_from_m3u_url(self, source_info: Dict) -> List[str]:
"""Discover channels from M3U playlist URL"""
try:
self.logger.info(f"Discovering from M3U: {source_info['name']}")
response = requests.get(source_info['url'], timeout=30)
response.raise_for_status()
content = response.text
channels = []
# Parse M3U content
lines = content.split('\n')
current_extinf = None
for line in lines:
line = line.strip()
if line.startswith('#EXTINF:'):
current_extinf = line
elif line.startswith('http') and current_extinf:
# We have a complete channel entry
channels.append(f"{current_extinf}\n{line}")
current_extinf = None
self.logger.info(f"Found {len(channels)} channels from {source_info['name']}")
self.source_stats[source_info['name']] = len(channels)
return channels[:self.config['filters']['max_channels_per_source']]
except Exception as e:
self.logger.error(f"Error discovering from {source_info['name']}: {e}")
return []
def discover_from_github_directory(self, source_info: Dict) -> List[str]:
"""Discover channels from GitHub directory structure"""
try:
self.logger.info(f"Discovering from GitHub: {source_info['name']}")
base_url = source_info['url']
channels = []
# Try common country codes from your existing patterns
country_codes = source_info.get('country_filter', ['us', 'uk', 'ca', 'de', 'fr'])
for country in country_codes:
try:
url = f"{base_url}{country}.m3u"
response = requests.get(url, timeout=15)
if response.status_code == 200:
content = response.text
lines = content.split('\n')
current_extinf = None
for line in lines:
line = line.strip()
if line.startswith('#EXTINF:'):
current_extinf = line
elif line.startswith('http') and current_extinf:
channels.append(f"{current_extinf}\n{line}")
current_extinf = None
self.logger.info(f"Found {len(channels)} channels for {country}")
except Exception as e:
self.logger.debug(f"No channels found for {country}: {e}")
continue
self.source_stats[source_info['name']] = len(channels)
return channels[:self.config['filters']['max_channels_per_source']]
except Exception as e:
self.logger.error(f"Error discovering from GitHub {source_info['name']}: {e}")
return []
def filter_channels(self, channels: List[str]) -> List[str]:
"""Apply quality and content filters"""
if not self.config['filters']['exclude_adult']:
return channels
# Load adult keywords from existing config
try:
with open('config/patterns.json', 'r') as f:
patterns = json.load(f)
adult_keywords = patterns.get('adult_keywords', [])
except:
adult_keywords = ['xxx', 'adult', 'porn', '+18']
filtered = []
for channel in channels:
# Check if channel contains adult content
channel_lower = channel.lower()
if not any(keyword in channel_lower for keyword in adult_keywords):
filtered.append(channel)
self.logger.info(f"Filtered {len(channels) - len(filtered)} adult channels")
return filtered
def deduplicate_with_existing(self, new_channels: List[str]) -> List[str]:
"""Remove channels that already exist in channels.txt"""
if not os.path.exists('channels.txt'):
return new_channels
# Read existing channel URLs
existing_urls = set()
try:
with open('channels.txt', 'r', encoding='utf-8') as f:
content = f.read()
# Extract URLs from existing channels.txt
url_pattern = r'Stream URL\s*=\s*(.+)'
existing_urls = set(re.findall(url_pattern, content))
except Exception as e:
self.logger.warning(f"Could not read existing channels: {e}")
# Filter out duplicates
unique_channels = []
for channel in new_channels:
lines = channel.split('\n')
if len(lines) >= 2:
url = lines[1].strip()
if url not in existing_urls:
unique_channels.append(channel)
self.logger.info(f"Removed {len(new_channels) - len(unique_channels)} duplicate channels")
return unique_channels
def append_to_bulk_import(self, channels: List[str]):
"""Append discovered channels to bulk_import.m3u"""
if not channels:
self.logger.info("No new channels to add")
return
# Read existing bulk_import content
existing_content = ""
if os.path.exists('bulk_import.m3u'):
with open('bulk_import.m3u', 'r', encoding='utf-8') as f:
existing_content = f.read().strip()
# If file is empty or only has header, start fresh
if not existing_content or existing_content == '#EXTM3U':
existing_content = '#EXTM3U'
# Append new channels
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
f.write(existing_content)
if not existing_content.endswith('\n'):
f.write('\n')
f.write('\n'.join(channels))
f.write('\n')
self.logger.info(f"Added {len(channels)} new channels to bulk_import.m3u")
def generate_discovery_report(self):
"""Generate discovery session report"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_path = f"reports/daily/discovery_report_{timestamp}.md"
os.makedirs('reports/daily', exist_ok=True)
total_discovered = sum(self.source_stats.values())
with open(report_path, 'w', encoding='utf-8') as f:
f.write(f"# Channel Discovery Report\n")
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"## Summary\n")
f.write(f"- **Total Channels Discovered:** {total_discovered}\n")
f.write(f"- **Sources Checked:** {len(self.config['sources'])}\n")
f.write(f"- **Active Sources:** {len(self.source_stats)}\n\n")
f.write(f"## Source Breakdown\n")
for source_name, count in self.source_stats.items():
f.write(f"- **{source_name}:** {count} channels\n")
f.write(f"\n## Configuration\n")
f.write(f"- **Max per source:** {self.config['filters']['max_channels_per_source']}\n")
f.write(f"- **Adult filter:** {'Enabled' if self.config['filters']['exclude_adult'] else 'Disabled'}\n")
f.write(f"- **Quality filter:** {self.config['filters']['min_quality']}\n")
f.write(f"\n---\n*Auto-generated by Source Scraper*\n")
self.logger.info(f"Discovery report saved: {report_path}")
def run_discovery(self):
"""Main discovery process"""
if not self.config['enabled']:
self.logger.info("Discovery is disabled in configuration")
return
self.logger.info("=== Starting Channel Discovery ===")
all_discovered = []
for source in self.config['sources']:
try:
if source['type'] == 'm3u_playlist':
channels = self.discover_from_m3u_url(source)
elif source['type'] == 'github_directory':
channels = self.discover_from_github_directory(source)
else:
self.logger.warning(f"Unknown source type: {source['type']}")
continue
all_discovered.extend(channels)
# Rate limiting
import time
time.sleep(self.config['rate_limiting']['delay_between_requests'])
except Exception as e:
self.logger.error(f"Error processing source {source['name']}: {e}")
continue
# Apply filters
filtered_channels = self.filter_channels(all_discovered)
unique_channels = self.deduplicate_with_existing(filtered_channels)
# Add to bulk import
self.append_to_bulk_import(unique_channels)
# Generate report
self.generate_discovery_report()
self.logger.info(f"=== Discovery Complete: {len(unique_channels)} new channels added ===")
if __name__ == "__main__":
scraper = SourceScraper()
scraper.run_discovery()