# scripts/source_scraper.py """ Automated Channel Discovery for IPTV Playlist Generator Integrates seamlessly with existing architecture """ import requests import json import os import logging from datetime import datetime from typing import List, Dict, Set import re from urllib.parse import urlparse class SourceScraper: def __init__(self): self.setup_logging() self.load_config() self.discovered_channels = [] self.source_stats = {} def setup_logging(self): """Setup logging consistent with existing system""" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) self.logger = logging.getLogger(__name__) def load_config(self): """Load discovery sources configuration""" try: with open('config/discovery_sources.json', 'r') as f: self.config = json.load(f) except FileNotFoundError: # Create default config if it doesn't exist self.config = { "enabled": True, "sources": [ { "name": "IPTV-Org Main", "url": "https://raw.githubusercontent.com/iptv-org/iptv/master/streams/", "type": "github_directory", "country_filter": ["us", "uk", "ca", "au"] }, { "name": "Free-TV Collection", "url": "https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8", "type": "m3u_playlist", "quality_filter": ["hd", "fhd", "4k"] } ], "filters": { "min_quality": "sd", "exclude_adult": True, "max_channels_per_source": 100, "require_country_detection": False }, "rate_limiting": { "delay_between_requests": 1.0, "max_retries": 3 } } os.makedirs('config', exist_ok=True) with open('config/discovery_sources.json', 'w') as f: json.dump(self.config, f, indent=2) def discover_from_m3u_url(self, source_info: Dict) -> List[str]: """Discover channels from M3U playlist URL""" try: self.logger.info(f"Discovering from M3U: {source_info['name']}") response = requests.get(source_info['url'], timeout=30) response.raise_for_status() content = response.text channels = [] # Parse M3U content lines = content.split('\n') current_extinf = None for line in lines: line = line.strip() if line.startswith('#EXTINF:'): current_extinf = line elif line.startswith('http') and current_extinf: # We have a complete channel entry channels.append(f"{current_extinf}\n{line}") current_extinf = None self.logger.info(f"Found {len(channels)} channels from {source_info['name']}") self.source_stats[source_info['name']] = len(channels) return channels[:self.config['filters']['max_channels_per_source']] except Exception as e: self.logger.error(f"Error discovering from {source_info['name']}: {e}") return [] def discover_from_github_directory(self, source_info: Dict) -> List[str]: """Discover channels from GitHub directory structure""" try: self.logger.info(f"Discovering from GitHub: {source_info['name']}") base_url = source_info['url'] channels = [] # Try common country codes from your existing patterns country_codes = source_info.get('country_filter', ['us', 'uk', 'ca', 'de', 'fr']) for country in country_codes: try: url = f"{base_url}{country}.m3u" response = requests.get(url, timeout=15) if response.status_code == 200: content = response.text lines = content.split('\n') current_extinf = None for line in lines: line = line.strip() if line.startswith('#EXTINF:'): current_extinf = line elif line.startswith('http') and current_extinf: channels.append(f"{current_extinf}\n{line}") current_extinf = None self.logger.info(f"Found {len(channels)} channels for {country}") except Exception as e: self.logger.debug(f"No channels found for {country}: {e}") continue self.source_stats[source_info['name']] = len(channels) return channels[:self.config['filters']['max_channels_per_source']] except Exception as e: self.logger.error(f"Error discovering from GitHub {source_info['name']}: {e}") return [] def filter_channels(self, channels: List[str]) -> List[str]: """Apply quality and content filters""" if not self.config['filters']['exclude_adult']: return channels # Load adult keywords from existing config try: with open('config/patterns.json', 'r') as f: patterns = json.load(f) adult_keywords = patterns.get('adult_keywords', []) except: adult_keywords = ['xxx', 'adult', 'porn', '+18'] filtered = [] for channel in channels: # Check if channel contains adult content channel_lower = channel.lower() if not any(keyword in channel_lower for keyword in adult_keywords): filtered.append(channel) self.logger.info(f"Filtered {len(channels) - len(filtered)} adult channels") return filtered def deduplicate_with_existing(self, new_channels: List[str]) -> List[str]: """Remove channels that already exist in channels.txt""" if not os.path.exists('channels.txt'): return new_channels # Read existing channel URLs existing_urls = set() try: with open('channels.txt', 'r', encoding='utf-8') as f: content = f.read() # Extract URLs from existing channels.txt url_pattern = r'Stream URL\s*=\s*(.+)' existing_urls = set(re.findall(url_pattern, content)) except Exception as e: self.logger.warning(f"Could not read existing channels: {e}") # Filter out duplicates unique_channels = [] for channel in new_channels: lines = channel.split('\n') if len(lines) >= 2: url = lines[1].strip() if url not in existing_urls: unique_channels.append(channel) self.logger.info(f"Removed {len(new_channels) - len(unique_channels)} duplicate channels") return unique_channels def append_to_bulk_import(self, channels: List[str]): """Append discovered channels to bulk_import.m3u""" if not channels: self.logger.info("No new channels to add") return # Read existing bulk_import content existing_content = "" if os.path.exists('bulk_import.m3u'): with open('bulk_import.m3u', 'r', encoding='utf-8') as f: existing_content = f.read().strip() # If file is empty or only has header, start fresh if not existing_content or existing_content == '#EXTM3U': existing_content = '#EXTM3U' # Append new channels with open('bulk_import.m3u', 'w', encoding='utf-8') as f: f.write(existing_content) if not existing_content.endswith('\n'): f.write('\n') f.write('\n'.join(channels)) f.write('\n') self.logger.info(f"Added {len(channels)} new channels to bulk_import.m3u") def generate_discovery_report(self): """Generate discovery session report""" timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") report_path = f"reports/daily/discovery_report_{timestamp}.md" os.makedirs('reports/daily', exist_ok=True) total_discovered = sum(self.source_stats.values()) with open(report_path, 'w', encoding='utf-8') as f: f.write(f"# Channel Discovery Report\n") f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"## Summary\n") f.write(f"- **Total Channels Discovered:** {total_discovered}\n") f.write(f"- **Sources Checked:** {len(self.config['sources'])}\n") f.write(f"- **Active Sources:** {len(self.source_stats)}\n\n") f.write(f"## Source Breakdown\n") for source_name, count in self.source_stats.items(): f.write(f"- **{source_name}:** {count} channels\n") f.write(f"\n## Configuration\n") f.write(f"- **Max per source:** {self.config['filters']['max_channels_per_source']}\n") f.write(f"- **Adult filter:** {'Enabled' if self.config['filters']['exclude_adult'] else 'Disabled'}\n") f.write(f"- **Quality filter:** {self.config['filters']['min_quality']}\n") f.write(f"\n---\n*Auto-generated by Source Scraper*\n") self.logger.info(f"Discovery report saved: {report_path}") def run_discovery(self): """Main discovery process""" if not self.config['enabled']: self.logger.info("Discovery is disabled in configuration") return self.logger.info("=== Starting Channel Discovery ===") all_discovered = [] for source in self.config['sources']: try: if source['type'] == 'm3u_playlist': channels = self.discover_from_m3u_url(source) elif source['type'] == 'github_directory': channels = self.discover_from_github_directory(source) else: self.logger.warning(f"Unknown source type: {source['type']}") continue all_discovered.extend(channels) # Rate limiting import time time.sleep(self.config['rate_limiting']['delay_between_requests']) except Exception as e: self.logger.error(f"Error processing source {source['name']}: {e}") continue # Apply filters filtered_channels = self.filter_channels(all_discovered) unique_channels = self.deduplicate_with_existing(filtered_channels) # Add to bulk import self.append_to_bulk_import(unique_channels) # Generate report self.generate_discovery_report() self.logger.info(f"=== Discovery Complete: {len(unique_channels)} new channels added ===") if __name__ == "__main__": scraper = SourceScraper() scraper.run_discovery()