From 65a31644be7fd90f212f0574f9c0681b31f87d01 Mon Sep 17 00:00:00 2001 From: stoney420 Date: Fri, 27 Jun 2025 23:30:14 +0200 Subject: [PATCH] Add scripts/file_manager.py --- scripts/file_manager.py | 125 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) create mode 100644 scripts/file_manager.py diff --git a/scripts/file_manager.py b/scripts/file_manager.py new file mode 100644 index 0000000..a50c388 --- /dev/null +++ b/scripts/file_manager.py @@ -0,0 +1,125 @@ +""" +File Manager - Handles file operations, backups, and channel loading +""" + +import os +import re +import shutil +import logging +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +class FileManager: + """Manage file operations with backup and rotation.""" + + def __init__(self, config): + self.config = config + self.logger = logging.getLogger(__name__) + self.backup_dir = Path("backups") + self.backup_dir.mkdir(exist_ok=True) + + def create_backup(self, file_path: str) -> Optional[Path]: + """Create timestamped backup with rotation.""" + if not self.config.settings.get('create_backup', True): + return None + + file_path = Path(file_path) + if not file_path.exists(): + return None + + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_path = self.backup_dir / f"{file_path.stem}_{timestamp}{file_path.suffix}" + + try: + shutil.copy2(file_path, backup_path) + self.logger.info(f"Created backup: {backup_path}") + self._cleanup_old_backups(file_path.stem) + return backup_path + except Exception as e: + self.logger.error(f"Failed to create backup: {e}") + return None + + def _cleanup_old_backups(self, base_name: str): + """Remove old backups, keeping only the most recent ones.""" + max_backups = self.config.settings.get('max_backups', 5) + + backup_files = sorted( + [f for f in self.backup_dir.glob(f"{base_name}_*") if f.is_file()], + key=lambda x: x.stat().st_mtime, + reverse=True + ) + + for old_backup in backup_files[max_backups:]: + try: + old_backup.unlink() + self.logger.debug(f"Removed old backup: {old_backup}") + except Exception as e: + self.logger.warning(f"Could not remove old backup {old_backup}: {e}") + + def load_all_channels(self) -> List[Dict]: + """Load all channels from the channels file.""" + if not os.path.exists(self.config.channels_file): + self.logger.info("No channels.txt file found") + return [] + + try: + with open(self.config.channels_file, 'r', encoding='utf-8') as f: + content = f.read() + + channel_blocks = re.split(r'\n\s*\n+', content.strip()) + channels = [] + + for block in channel_blocks: + if block.strip(): + channel = self._parse_channel_block(block) + if channel: + channels.append(channel) + + self.logger.info(f"Loaded {len(channels)} channels from file") + return channels + + except Exception as e: + self.logger.error(f"Error loading channels: {e}") + return [] + + def _parse_channel_block(self, block: str) -> Optional[Dict]: + """Parse a channel block from channels.txt.""" + channel_data = {} + lines = block.strip().split('\n') + + for line in lines: + if '=' in line: + key, value = line.split('=', 1) + channel_data[key.strip()] = value.strip() + + return channel_data if channel_data else None + + def save_channels(self, channels: List[Dict]) -> bool: + """Save channels to the channels.txt file.""" + try: + # Create backup first + self.create_backup(self.config.channels_file) + + with open(self.config.channels_file, 'w', encoding='utf-8') as f: + for i, channel in enumerate(channels): + if i > 0: + f.write("\n\n") + f.write(self._convert_to_channels_txt_block(channel)) + + self.logger.info(f"Saved {len(channels)} channels to file") + return True + + except Exception as e: + self.logger.error(f"Error saving channels: {e}") + return False + + def _convert_to_channels_txt_block(self, channel_data: Dict) -> str: + """Convert to channels.txt format.""" + block = [] + block.append(f"Group = {channel_data.get('Group', 'Uncategorized')}") + block.append(f"Stream name = {channel_data.get('Stream name', 'Unknown Channel')}") + block.append(f"Logo = {channel_data.get('Logo', '')}") + block.append(f"EPG id = {channel_data.get('EPG id', '')}") + block.append(f"Stream URL = {channel_data.get('Stream URL', '')}") + return "\n".join(block) \ No newline at end of file