Some checks failed
Generate M3U Playlist with Auto-Organization / build-and-organize (push) Has been cancelled
294 lines
No EOL
12 KiB
Python
294 lines
No EOL
12 KiB
Python
# scripts/source_scraper.py
|
|
"""
|
|
Automated Channel Discovery for IPTV Playlist Generator
|
|
Integrates seamlessly with existing architecture
|
|
"""
|
|
|
|
import requests
|
|
import json
|
|
import os
|
|
import logging
|
|
from datetime import datetime
|
|
from typing import List, Dict, Set
|
|
import re
|
|
from urllib.parse import urlparse
|
|
|
|
class SourceScraper:
|
|
def __init__(self):
|
|
self.setup_logging()
|
|
self.load_config()
|
|
self.discovered_channels = []
|
|
self.source_stats = {}
|
|
|
|
def setup_logging(self):
|
|
"""Setup logging consistent with existing system"""
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(message)s'
|
|
)
|
|
self.logger = logging.getLogger(__name__)
|
|
|
|
def load_config(self):
|
|
"""Load discovery sources configuration"""
|
|
try:
|
|
with open('config/discovery_sources.json', 'r') as f:
|
|
self.config = json.load(f)
|
|
except FileNotFoundError:
|
|
# Create default config if it doesn't exist
|
|
self.config = {
|
|
"enabled": True,
|
|
"sources": [
|
|
{
|
|
"name": "IPTV-Org Main",
|
|
"url": "https://raw.githubusercontent.com/iptv-org/iptv/master/streams/",
|
|
"type": "github_directory",
|
|
"country_filter": ["us", "uk", "ca", "au"]
|
|
},
|
|
{
|
|
"name": "Free-TV Collection",
|
|
"url": "https://raw.githubusercontent.com/Free-TV/IPTV/master/playlist.m3u8",
|
|
"type": "m3u_playlist",
|
|
"quality_filter": ["hd", "fhd", "4k"]
|
|
}
|
|
],
|
|
"filters": {
|
|
"min_quality": "sd",
|
|
"exclude_adult": True,
|
|
"max_channels_per_source": 100,
|
|
"require_country_detection": False
|
|
},
|
|
"rate_limiting": {
|
|
"delay_between_requests": 1.0,
|
|
"max_retries": 3
|
|
}
|
|
}
|
|
os.makedirs('config', exist_ok=True)
|
|
with open('config/discovery_sources.json', 'w') as f:
|
|
json.dump(self.config, f, indent=2)
|
|
|
|
def discover_from_m3u_url(self, source_info: Dict) -> List[str]:
|
|
"""Discover channels from M3U playlist URL"""
|
|
try:
|
|
self.logger.info(f"Discovering from M3U: {source_info['name']}")
|
|
|
|
response = requests.get(source_info['url'], timeout=30)
|
|
response.raise_for_status()
|
|
|
|
content = response.text
|
|
channels = []
|
|
|
|
# Parse M3U content
|
|
lines = content.split('\n')
|
|
current_extinf = None
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('#EXTINF:'):
|
|
current_extinf = line
|
|
elif line.startswith('http') and current_extinf:
|
|
# We have a complete channel entry
|
|
channels.append(f"{current_extinf}\n{line}")
|
|
current_extinf = None
|
|
|
|
self.logger.info(f"Found {len(channels)} channels from {source_info['name']}")
|
|
self.source_stats[source_info['name']] = len(channels)
|
|
|
|
return channels[:self.config['filters']['max_channels_per_source']]
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error discovering from {source_info['name']}: {e}")
|
|
return []
|
|
|
|
def discover_from_github_directory(self, source_info: Dict) -> List[str]:
|
|
"""Discover channels from GitHub directory structure"""
|
|
try:
|
|
self.logger.info(f"Discovering from GitHub: {source_info['name']}")
|
|
|
|
base_url = source_info['url']
|
|
channels = []
|
|
|
|
# Try common country codes from your existing patterns
|
|
country_codes = source_info.get('country_filter', ['us', 'uk', 'ca', 'de', 'fr'])
|
|
|
|
for country in country_codes:
|
|
try:
|
|
url = f"{base_url}{country}.m3u"
|
|
response = requests.get(url, timeout=15)
|
|
|
|
if response.status_code == 200:
|
|
content = response.text
|
|
lines = content.split('\n')
|
|
current_extinf = None
|
|
|
|
for line in lines:
|
|
line = line.strip()
|
|
if line.startswith('#EXTINF:'):
|
|
current_extinf = line
|
|
elif line.startswith('http') and current_extinf:
|
|
channels.append(f"{current_extinf}\n{line}")
|
|
current_extinf = None
|
|
|
|
self.logger.info(f"Found {len(channels)} channels for {country}")
|
|
|
|
except Exception as e:
|
|
self.logger.debug(f"No channels found for {country}: {e}")
|
|
continue
|
|
|
|
self.source_stats[source_info['name']] = len(channels)
|
|
return channels[:self.config['filters']['max_channels_per_source']]
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error discovering from GitHub {source_info['name']}: {e}")
|
|
return []
|
|
|
|
def filter_channels(self, channels: List[str]) -> List[str]:
|
|
"""Apply quality and content filters"""
|
|
if not self.config['filters']['exclude_adult']:
|
|
return channels
|
|
|
|
# Load adult keywords from existing config
|
|
try:
|
|
with open('config/patterns.json', 'r') as f:
|
|
patterns = json.load(f)
|
|
adult_keywords = patterns.get('adult_keywords', [])
|
|
except:
|
|
adult_keywords = ['xxx', 'adult', 'porn', '+18']
|
|
|
|
filtered = []
|
|
for channel in channels:
|
|
# Check if channel contains adult content
|
|
channel_lower = channel.lower()
|
|
if not any(keyword in channel_lower for keyword in adult_keywords):
|
|
filtered.append(channel)
|
|
|
|
self.logger.info(f"Filtered {len(channels) - len(filtered)} adult channels")
|
|
return filtered
|
|
|
|
def deduplicate_with_existing(self, new_channels: List[str]) -> List[str]:
|
|
"""Remove channels that already exist in channels.txt"""
|
|
if not os.path.exists('channels.txt'):
|
|
return new_channels
|
|
|
|
# Read existing channel URLs
|
|
existing_urls = set()
|
|
try:
|
|
with open('channels.txt', 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
# Extract URLs from existing channels.txt
|
|
url_pattern = r'Stream URL\s*=\s*(.+)'
|
|
existing_urls = set(re.findall(url_pattern, content))
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not read existing channels: {e}")
|
|
|
|
# Filter out duplicates
|
|
unique_channels = []
|
|
for channel in new_channels:
|
|
lines = channel.split('\n')
|
|
if len(lines) >= 2:
|
|
url = lines[1].strip()
|
|
if url not in existing_urls:
|
|
unique_channels.append(channel)
|
|
|
|
self.logger.info(f"Removed {len(new_channels) - len(unique_channels)} duplicate channels")
|
|
return unique_channels
|
|
|
|
def append_to_bulk_import(self, channels: List[str]):
|
|
"""Append discovered channels to bulk_import.m3u"""
|
|
if not channels:
|
|
self.logger.info("No new channels to add")
|
|
return
|
|
|
|
# Read existing bulk_import content
|
|
existing_content = ""
|
|
if os.path.exists('bulk_import.m3u'):
|
|
with open('bulk_import.m3u', 'r', encoding='utf-8') as f:
|
|
existing_content = f.read().strip()
|
|
|
|
# If file is empty or only has header, start fresh
|
|
if not existing_content or existing_content == '#EXTM3U':
|
|
existing_content = '#EXTM3U'
|
|
|
|
# Append new channels
|
|
with open('bulk_import.m3u', 'w', encoding='utf-8') as f:
|
|
f.write(existing_content)
|
|
if not existing_content.endswith('\n'):
|
|
f.write('\n')
|
|
f.write('\n'.join(channels))
|
|
f.write('\n')
|
|
|
|
self.logger.info(f"Added {len(channels)} new channels to bulk_import.m3u")
|
|
|
|
def generate_discovery_report(self):
|
|
"""Generate discovery session report"""
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
report_path = f"reports/daily/discovery_report_{timestamp}.md"
|
|
|
|
os.makedirs('reports/daily', exist_ok=True)
|
|
|
|
total_discovered = sum(self.source_stats.values())
|
|
|
|
with open(report_path, 'w', encoding='utf-8') as f:
|
|
f.write(f"# Channel Discovery Report\n")
|
|
f.write(f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
|
|
f.write(f"## Summary\n")
|
|
f.write(f"- **Total Channels Discovered:** {total_discovered}\n")
|
|
f.write(f"- **Sources Checked:** {len(self.config['sources'])}\n")
|
|
f.write(f"- **Active Sources:** {len(self.source_stats)}\n\n")
|
|
f.write(f"## Source Breakdown\n")
|
|
|
|
for source_name, count in self.source_stats.items():
|
|
f.write(f"- **{source_name}:** {count} channels\n")
|
|
|
|
f.write(f"\n## Configuration\n")
|
|
f.write(f"- **Max per source:** {self.config['filters']['max_channels_per_source']}\n")
|
|
f.write(f"- **Adult filter:** {'Enabled' if self.config['filters']['exclude_adult'] else 'Disabled'}\n")
|
|
f.write(f"- **Quality filter:** {self.config['filters']['min_quality']}\n")
|
|
f.write(f"\n---\n*Auto-generated by Source Scraper*\n")
|
|
|
|
self.logger.info(f"Discovery report saved: {report_path}")
|
|
|
|
def run_discovery(self):
|
|
"""Main discovery process"""
|
|
if not self.config['enabled']:
|
|
self.logger.info("Discovery is disabled in configuration")
|
|
return
|
|
|
|
self.logger.info("=== Starting Channel Discovery ===")
|
|
|
|
all_discovered = []
|
|
|
|
for source in self.config['sources']:
|
|
try:
|
|
if source['type'] == 'm3u_playlist':
|
|
channels = self.discover_from_m3u_url(source)
|
|
elif source['type'] == 'github_directory':
|
|
channels = self.discover_from_github_directory(source)
|
|
else:
|
|
self.logger.warning(f"Unknown source type: {source['type']}")
|
|
continue
|
|
|
|
all_discovered.extend(channels)
|
|
|
|
# Rate limiting
|
|
import time
|
|
time.sleep(self.config['rate_limiting']['delay_between_requests'])
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"Error processing source {source['name']}: {e}")
|
|
continue
|
|
|
|
# Apply filters
|
|
filtered_channels = self.filter_channels(all_discovered)
|
|
unique_channels = self.deduplicate_with_existing(filtered_channels)
|
|
|
|
# Add to bulk import
|
|
self.append_to_bulk_import(unique_channels)
|
|
|
|
# Generate report
|
|
self.generate_discovery_report()
|
|
|
|
self.logger.info(f"=== Discovery Complete: {len(unique_channels)} new channels added ===")
|
|
|
|
if __name__ == "__main__":
|
|
scraper = SourceScraper()
|
|
scraper.run_discovery() |