diff --git a/scripts/source_scraper.py b/scripts/source_scraper.py deleted file mode 100644 index 6c6534c..0000000 --- a/scripts/source_scraper.py +++ /dev/null @@ -1,294 +0,0 @@ -# scripts/source_scraper.py -""" -Automated Channel Discovery for IPTV Playlist Generator -Integrates seamlessly with existing architecture -""" - -import requests -import json -import os -import logging -from datetime import datetime -from typing import List, Dict, Set -import re -from urllib.parse import urlparse - -class SourceScraper: - def __init__(self): - self.setup_logging() - self.load_config() - self.discovered_channels = [] - self.source_stats = {} - - def setup_logging(self): - """Setup logging consistent with existing system""" - logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(levelname)s - %(message)s' - ) - self.logger = logging.getLogger(__name__) - - def load_config(self): - """Load discovery sources configuration""" - try: - with open('config/discovery_sources.json', 'r') as f: - self.config = json.load(f) - except FileNotFoundError: - # Create default config if it doesn't exist - self.config = { - "enabled": True, - "sources": [ - { - "name": "IPTV-Org Main", - "url": "https://raw.githubusercontent.com/iptv-org/iptv/master/streams/", - "type": "github_directory", - "country_filter": ["us", "uk", "ca", "au"] - }, - { - "name": "Free-TV Collection", - "url": "https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8", - "type": "m3u_playlist", - "quality_filter": ["hd", "fhd", "4k"] - } - ], - "filters": { - "min_quality": "sd", - "exclude_adult": True, - "max_channels_per_source": 100, - "require_country_detection": False - }, - "rate_limiting": { - "delay_between_requests": 1.0, - "max_retries": 3 - } - } - os.makedirs('config', exist_ok=True) - with open('config/discovery_sources.json', 'w') as f: - json.dump(self.config, f, indent=2) - - def discover_from_m3u_url(self, source_info: Dict) -> List[str]: - """Discover channels from M3U playlist URL""" - try: - self.logger.info(f"Discovering from M3U: {source_info['name']}") - - response = requests.get(source_info['url'], timeout=30) - response.raise_for_status() - - content = response.text - channels = [] - - # Parse M3U content - lines = content.split('\n') - current_extinf = None - - for line in lines: - line = line.strip() - if line.startswith('#EXTINF:'): - current_extinf = line - elif line.startswith('http') and current_extinf: - # We have a complete channel entry - channels.append(f"{current_extinf}\n{line}") - current_extinf = None - - self.logger.info(f"Found {len(channels)} channels from {source_info['name']}") - self.source_stats[source_info['name']] = len(channels) - - return channels[:self.config['filters']['max_channels_per_source']] - - except Exception as e: - self.logger.error(f"Error discovering from {source_info['name']}: {e}") - return [] - - def discover_from_github_directory(self, source_info: Dict) -> List[str]: - """Discover channels from GitHub directory structure""" - try: - self.logger.info(f"Discovering from GitHub: {source_info['name']}") - - base_url = source_info['url'] - channels = [] - - # Try common country codes from your existing patterns - country_codes = source_info.get('country_filter', ['us', 'uk', 'ca', 'de', 'fr']) - - for country in country_codes: - try: - url = f"{base_url}{country}.m3u" - response = requests.get(url, timeout=15) - - if response.status_code == 200: - content = response.text - lines = content.split('\n') - current_extinf = None - - for line in lines: - line = line.strip() - if line.startswith('#EXTINF:'): - current_extinf = line - elif line.startswith('http') and current_extinf: - channels.append(f"{current_extinf}\n{line}") - current_extinf = None - - self.logger.info(f"Found {len(channels)} channels for {country}") - - except Exception as e: - self.logger.debug(f"No channels found for {country}: {e}") - continue - - self.source_stats[source_info['name']] = len(channels) - return channels[:self.config['filters']['max_channels_per_source']] - - except Exception as e: - self.logger.error(f"Error discovering from GitHub {source_info['name']}: {e}") - return [] - - def filter_channels(self, channels: List[str]) -> List[str]: - """Apply quality and content filters""" - if not self.config['filters']['exclude_adult']: - return channels - - # Load adult keywords from existing config - try: - with open('config/patterns.json', 'r') as f: - patterns = json.load(f) - adult_keywords = patterns.get('adult_keywords', []) - except: - adult_keywords = ['xxx', 'adult', 'porn', '+18'] - - filtered = [] - for channel in channels: - # Check if channel contains adult content - channel_lower = channel.lower() - if not any(keyword in channel_lower for keyword in adult_keywords): - filtered.append(channel) - - self.logger.info(f"Filtered {len(channels) - len(filtered)} adult channels") - return filtered - - def deduplicate_with_existing(self, new_channels: List[str]) -> List[str]: - """Remove channels that already exist in channels.txt""" - if not os.path.exists('channels.txt'): - return new_channels - - # Read existing channel URLs - existing_urls = set() - try: - with open('channels.txt', 'r', encoding='utf-8') as f: - content = f.read() - # Extract URLs from existing channels.txt - url_pattern = r'Stream URL\s*=\s*(.+)' - existing_urls = set(re.findall(url_pattern, content)) - except Exception as e: - self.logger.warning(f"Could not read existing channels: {e}") - - # Filter out duplicates - unique_channels = [] - for channel in new_channels: - lines = channel.split('\n') - if len(lines) >= 2: - url = lines[1].strip() - if url not in existing_urls: - unique_channels.append(channel) - - self.logger.info(f"Removed {len(new_channels) - len(unique_channels)} duplicate channels") - return unique_channels - - def append_to_bulk_import(self, channels: List[str]): - """Append discovered channels to bulk_import.m3u""" - if not channels: - self.logger.info("No new channels to add") - return - - # Read existing bulk_import content - existing_content = "" - if os.path.exists('bulk_import.m3u'): - with open('bulk_import.m3u', 'r', encoding='utf-8') as f: - existing_content = f.read().strip() - - # If file is empty or only has header, start fresh - if not existing_content or existing_content == '#EXTM3U': - existing_content = '#EXTM3U' - - # Append new channels - with open('bulk_import.m3u', 'w', encoding='utf-8') as f: - f.write(existing_content) - if not existing_content.endswith('\n'): - f.write('\n') - f.write('\n'.join(channels)) - f.write('\n') - - self.logger.info(f"Added {len(channels)} new channels to bulk_import.m3u") - - def generate_discovery_report(self): - """Generate discovery session report""" - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - report_path = f"reports/daily/discovery_report_{timestamp}.md" - - os.makedirs('reports/daily', exist_ok=True) - - total_discovered = sum(self.source_stats.values()) - - with open(report_path, 'w', encoding='utf-8') as f: - f.write(f"# Channel Discovery Report\n") - f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") - f.write(f"## Summary\n") - f.write(f"- **Total Channels Discovered:** {total_discovered}\n") - f.write(f"- **Sources Checked:** {len(self.config['sources'])}\n") - f.write(f"- **Active Sources:** {len(self.source_stats)}\n\n") - f.write(f"## Source Breakdown\n") - - for source_name, count in self.source_stats.items(): - f.write(f"- **{source_name}:** {count} channels\n") - - f.write(f"\n## Configuration\n") - f.write(f"- **Max per source:** {self.config['filters']['max_channels_per_source']}\n") - f.write(f"- **Adult filter:** {'Enabled' if self.config['filters']['exclude_adult'] else 'Disabled'}\n") - f.write(f"- **Quality filter:** {self.config['filters']['min_quality']}\n") - f.write(f"\n---\n*Auto-generated by Source Scraper*\n") - - self.logger.info(f"Discovery report saved: {report_path}") - - def run_discovery(self): - """Main discovery process""" - if not self.config['enabled']: - self.logger.info("Discovery is disabled in configuration") - return - - self.logger.info("=== Starting Channel Discovery ===") - - all_discovered = [] - - for source in self.config['sources']: - try: - if source['type'] == 'm3u_playlist': - channels = self.discover_from_m3u_url(source) - elif source['type'] == 'github_directory': - channels = self.discover_from_github_directory(source) - else: - self.logger.warning(f"Unknown source type: {source['type']}") - continue - - all_discovered.extend(channels) - - # Rate limiting - import time - time.sleep(self.config['rate_limiting']['delay_between_requests']) - - except Exception as e: - self.logger.error(f"Error processing source {source['name']}: {e}") - continue - - # Apply filters - filtered_channels = self.filter_channels(all_discovered) - unique_channels = self.deduplicate_with_existing(filtered_channels) - - # Add to bulk import - self.append_to_bulk_import(unique_channels) - - # Generate report - self.generate_discovery_report() - - self.logger.info(f"=== Discovery Complete: {len(unique_channels)} new channels added ===") - -if __name__ == "__main__": - scraper = SourceScraper() - scraper.run_discovery() \ No newline at end of file