diff --git a/scripts/source_scraper.py b/scripts/source_scraper.py new file mode 100644 index 0000000..6c6534c --- /dev/null +++ b/scripts/source_scraper.py @@ -0,0 +1,294 @@ +# scripts/source_scraper.py +""" +Automated Channel Discovery for IPTV Playlist Generator +Integrates seamlessly with existing architecture +""" + +import requests +import json +import os +import logging +from datetime import datetime +from typing import List, Dict, Set +import re +from urllib.parse import urlparse + +class SourceScraper: + def __init__(self): + self.setup_logging() + self.load_config() + self.discovered_channels = [] + self.source_stats = {} + + def setup_logging(self): + """Setup logging consistent with existing system""" + logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' + ) + self.logger = logging.getLogger(__name__) + + def load_config(self): + """Load discovery sources configuration""" + try: + with open('config/discovery_sources.json', 'r') as f: + self.config = json.load(f) + except FileNotFoundError: + # Create default config if it doesn't exist + self.config = { + "enabled": True, + "sources": [ + { + "name": "IPTV-Org Main", + "url": "https://raw.githubusercontent.com/iptv-org/iptv/master/streams/", + "type": "github_directory", + "country_filter": ["us", "uk", "ca", "au"] + }, + { + "name": "Free-TV Collection", + "url": "https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8", + "type": "m3u_playlist", + "quality_filter": ["hd", "fhd", "4k"] + } + ], + "filters": { + "min_quality": "sd", + "exclude_adult": True, + "max_channels_per_source": 100, + "require_country_detection": False + }, + "rate_limiting": { + "delay_between_requests": 1.0, + "max_retries": 3 + } + } + os.makedirs('config', exist_ok=True) + with open('config/discovery_sources.json', 'w') as f: + json.dump(self.config, f, indent=2) + + def discover_from_m3u_url(self, source_info: Dict) -> List[str]: + """Discover channels from M3U playlist URL""" + try: + self.logger.info(f"Discovering from M3U: {source_info['name']}") + + response = requests.get(source_info['url'], timeout=30) + response.raise_for_status() + + content = response.text + channels = [] + + # Parse M3U content + lines = content.split('\n') + current_extinf = None + + for line in lines: + line = line.strip() + if line.startswith('#EXTINF:'): + current_extinf = line + elif line.startswith('http') and current_extinf: + # We have a complete channel entry + channels.append(f"{current_extinf}\n{line}") + current_extinf = None + + self.logger.info(f"Found {len(channels)} channels from {source_info['name']}") + self.source_stats[source_info['name']] = len(channels) + + return channels[:self.config['filters']['max_channels_per_source']] + + except Exception as e: + self.logger.error(f"Error discovering from {source_info['name']}: {e}") + return [] + + def discover_from_github_directory(self, source_info: Dict) -> List[str]: + """Discover channels from GitHub directory structure""" + try: + self.logger.info(f"Discovering from GitHub: {source_info['name']}") + + base_url = source_info['url'] + channels = [] + + # Try common country codes from your existing patterns + country_codes = source_info.get('country_filter', ['us', 'uk', 'ca', 'de', 'fr']) + + for country in country_codes: + try: + url = f"{base_url}{country}.m3u" + response = requests.get(url, timeout=15) + + if response.status_code == 200: + content = response.text + lines = content.split('\n') + current_extinf = None + + for line in lines: + line = line.strip() + if line.startswith('#EXTINF:'): + current_extinf = line + elif line.startswith('http') and current_extinf: + channels.append(f"{current_extinf}\n{line}") + current_extinf = None + + self.logger.info(f"Found {len(channels)} channels for {country}") + + except Exception as e: + self.logger.debug(f"No channels found for {country}: {e}") + continue + + self.source_stats[source_info['name']] = len(channels) + return channels[:self.config['filters']['max_channels_per_source']] + + except Exception as e: + self.logger.error(f"Error discovering from GitHub {source_info['name']}: {e}") + return [] + + def filter_channels(self, channels: List[str]) -> List[str]: + """Apply quality and content filters""" + if not self.config['filters']['exclude_adult']: + return channels + + # Load adult keywords from existing config + try: + with open('config/patterns.json', 'r') as f: + patterns = json.load(f) + adult_keywords = patterns.get('adult_keywords', []) + except: + adult_keywords = ['xxx', 'adult', 'porn', '+18'] + + filtered = [] + for channel in channels: + # Check if channel contains adult content + channel_lower = channel.lower() + if not any(keyword in channel_lower for keyword in adult_keywords): + filtered.append(channel) + + self.logger.info(f"Filtered {len(channels) - len(filtered)} adult channels") + return filtered + + def deduplicate_with_existing(self, new_channels: List[str]) -> List[str]: + """Remove channels that already exist in channels.txt""" + if not os.path.exists('channels.txt'): + return new_channels + + # Read existing channel URLs + existing_urls = set() + try: + with open('channels.txt', 'r', encoding='utf-8') as f: + content = f.read() + # Extract URLs from existing channels.txt + url_pattern = r'Stream URL\s*=\s*(.+)' + existing_urls = set(re.findall(url_pattern, content)) + except Exception as e: + self.logger.warning(f"Could not read existing channels: {e}") + + # Filter out duplicates + unique_channels = [] + for channel in new_channels: + lines = channel.split('\n') + if len(lines) >= 2: + url = lines[1].strip() + if url not in existing_urls: + unique_channels.append(channel) + + self.logger.info(f"Removed {len(new_channels) - len(unique_channels)} duplicate channels") + return unique_channels + + def append_to_bulk_import(self, channels: List[str]): + """Append discovered channels to bulk_import.m3u""" + if not channels: + self.logger.info("No new channels to add") + return + + # Read existing bulk_import content + existing_content = "" + if os.path.exists('bulk_import.m3u'): + with open('bulk_import.m3u', 'r', encoding='utf-8') as f: + existing_content = f.read().strip() + + # If file is empty or only has header, start fresh + if not existing_content or existing_content == '#EXTM3U': + existing_content = '#EXTM3U' + + # Append new channels + with open('bulk_import.m3u', 'w', encoding='utf-8') as f: + f.write(existing_content) + if not existing_content.endswith('\n'): + f.write('\n') + f.write('\n'.join(channels)) + f.write('\n') + + self.logger.info(f"Added {len(channels)} new channels to bulk_import.m3u") + + def generate_discovery_report(self): + """Generate discovery session report""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + report_path = f"reports/daily/discovery_report_{timestamp}.md" + + os.makedirs('reports/daily', exist_ok=True) + + total_discovered = sum(self.source_stats.values()) + + with open(report_path, 'w', encoding='utf-8') as f: + f.write(f"# Channel Discovery Report\n") + f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") + f.write(f"## Summary\n") + f.write(f"- **Total Channels Discovered:** {total_discovered}\n") + f.write(f"- **Sources Checked:** {len(self.config['sources'])}\n") + f.write(f"- **Active Sources:** {len(self.source_stats)}\n\n") + f.write(f"## Source Breakdown\n") + + for source_name, count in self.source_stats.items(): + f.write(f"- **{source_name}:** {count} channels\n") + + f.write(f"\n## Configuration\n") + f.write(f"- **Max per source:** {self.config['filters']['max_channels_per_source']}\n") + f.write(f"- **Adult filter:** {'Enabled' if self.config['filters']['exclude_adult'] else 'Disabled'}\n") + f.write(f"- **Quality filter:** {self.config['filters']['min_quality']}\n") + f.write(f"\n---\n*Auto-generated by Source Scraper*\n") + + self.logger.info(f"Discovery report saved: {report_path}") + + def run_discovery(self): + """Main discovery process""" + if not self.config['enabled']: + self.logger.info("Discovery is disabled in configuration") + return + + self.logger.info("=== Starting Channel Discovery ===") + + all_discovered = [] + + for source in self.config['sources']: + try: + if source['type'] == 'm3u_playlist': + channels = self.discover_from_m3u_url(source) + elif source['type'] == 'github_directory': + channels = self.discover_from_github_directory(source) + else: + self.logger.warning(f"Unknown source type: {source['type']}") + continue + + all_discovered.extend(channels) + + # Rate limiting + import time + time.sleep(self.config['rate_limiting']['delay_between_requests']) + + except Exception as e: + self.logger.error(f"Error processing source {source['name']}: {e}") + continue + + # Apply filters + filtered_channels = self.filter_channels(all_discovered) + unique_channels = self.deduplicate_with_existing(filtered_channels) + + # Add to bulk import + self.append_to_bulk_import(unique_channels) + + # Generate report + self.generate_discovery_report() + + self.logger.info(f"=== Discovery Complete: {len(unique_channels)} new channels added ===") + +if __name__ == "__main__": + scraper = SourceScraper() + scraper.run_discovery() \ No newline at end of file