From 335bbbcbeb0878e1ff70b1e5e5c272d32e1d746d Mon Sep 17 00:00:00 2001 From: stoney420 Date: Sat, 28 Jun 2025 00:03:36 +0200 Subject: [PATCH] Update scripts/file_manager.py --- scripts/file_manager.py | 89 ++++++++++++++++++++++++++++++++++------- 1 file changed, 75 insertions(+), 14 deletions(-) diff --git a/scripts/file_manager.py b/scripts/file_manager.py index a50c388..bb9e6ad 100644 --- a/scripts/file_manager.py +++ b/scripts/file_manager.py @@ -1,5 +1,5 @@ """ -File Manager - Handles file operations, backups, and channel loading +File Manager - FIXED to work with correct paths from scripts folder """ import os @@ -11,30 +11,38 @@ from pathlib import Path from typing import Dict, List, Optional class FileManager: - """Manage file operations with backup and rotation.""" + """Manage file operations with backup and rotation - FIXED paths.""" def __init__(self, config): self.config = config self.logger = logging.getLogger(__name__) - self.backup_dir = Path("backups") + + # FIXED: Get root directory and create backups there + script_dir = Path(__file__).parent + root_dir = script_dir.parent + self.backup_dir = root_dir / "backups" self.backup_dir.mkdir(exist_ok=True) + + self.logger.info(f"FileManager initialized - root: {root_dir}") def create_backup(self, file_path: str) -> Optional[Path]: """Create timestamped backup with rotation.""" if not self.config.settings.get('create_backup', True): return None - file_path = Path(file_path) - if not file_path.exists(): + # Use the full path from config + full_path = Path(file_path) + if not full_path.exists(): + self.logger.info(f"No file to backup: {file_path}") return None timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') - backup_path = self.backup_dir / f"{file_path.stem}_{timestamp}{file_path.suffix}" + backup_path = self.backup_dir / f"{full_path.stem}_{timestamp}{full_path.suffix}" try: - shutil.copy2(file_path, backup_path) + shutil.copy2(full_path, backup_path) self.logger.info(f"Created backup: {backup_path}") - self._cleanup_old_backups(file_path.stem) + self._cleanup_old_backups(full_path.stem) return backup_path except Exception as e: self.logger.error(f"Failed to create backup: {e}") @@ -59,14 +67,22 @@ class FileManager: def load_all_channels(self) -> List[Dict]: """Load all channels from the channels file.""" - if not os.path.exists(self.config.channels_file): - self.logger.info("No channels.txt file found") + channels_file = self.config.channels_file + + self.logger.info(f"Attempting to load channels from: {channels_file}") + + if not os.path.exists(channels_file): + self.logger.info(f"No channels file found at: {channels_file}") return [] try: - with open(self.config.channels_file, 'r', encoding='utf-8') as f: + with open(channels_file, 'r', encoding='utf-8') as f: content = f.read() + if not content.strip(): + self.logger.info(f"Channels file is empty: {channels_file}") + return [] + channel_blocks = re.split(r'\n\s*\n+', content.strip()) channels = [] @@ -97,23 +113,68 @@ class FileManager: def save_channels(self, channels: List[Dict]) -> bool: """Save channels to the channels.txt file.""" + channels_file = self.config.channels_file + + self.logger.info(f"Attempting to save {len(channels)} channels to: {channels_file}") + try: # Create backup first - self.create_backup(self.config.channels_file) + self.create_backup(channels_file) - with open(self.config.channels_file, 'w', encoding='utf-8') as f: + # Write all channels + with open(channels_file, 'w', encoding='utf-8') as f: for i, channel in enumerate(channels): if i > 0: f.write("\n\n") f.write(self._convert_to_channels_txt_block(channel)) - self.logger.info(f"Saved {len(channels)} channels to file") + self.logger.info(f"Successfully saved {len(channels)} channels to {channels_file}") + + # Verify the file was written + if os.path.exists(channels_file): + size = os.path.getsize(channels_file) + self.logger.info(f"Verified: {channels_file} now has {size} bytes") + return True except Exception as e: self.logger.error(f"Error saving channels: {e}") return False + def append_channels(self, new_channels: List[Dict]) -> bool: + """Append new channels to existing channels file.""" + if not new_channels: + self.logger.info("No new channels to append") + return True + + channels_file = self.config.channels_file + + self.logger.info(f"Appending {len(new_channels)} channels to: {channels_file}") + + try: + # Check if file exists and has content + file_exists = os.path.exists(channels_file) and os.path.getsize(channels_file) > 0 + + with open(channels_file, 'a', encoding='utf-8') as f: + for i, channel in enumerate(new_channels): + # Add separator if file has content or this isn't the first new channel + if i > 0 or file_exists: + f.write("\n\n") + f.write(self._convert_to_channels_txt_block(channel)) + + self.logger.info(f"Successfully appended {len(new_channels)} channels") + + # Verify the file was updated + if os.path.exists(channels_file): + size = os.path.getsize(channels_file) + self.logger.info(f"Verified: {channels_file} now has {size} bytes") + + return True + + except Exception as e: + self.logger.error(f"Error appending channels: {e}") + return False + def _convert_to_channels_txt_block(self, channel_data: Dict) -> str: """Convert to channels.txt format.""" block = []